%reload_ext autoreload
%autoreload 2
%matplotlib inline
#export
from fastai import *
from fastai.vision import *
path = Path('data/camvid')
path.ls()
path_lbl = path/'labels'
path_img = path/'images'
fnames = get_image_files(path_img)
fnames[:5]
path_lbl.ls()[:5]
img_f = fnames[0]
img = open_image(img_f)
img.show(figsize=(5,5))
codes = np.loadtxt(path/'codes.txt', dtype=str)
codes
def get_y_fn(fn): return path_lbl/f'{fn.name[:-4]}_P.png'
mask = open_mask(get_y_fn(img_f))
mask.show(figsize=(5,5), alpha=1)
mask.data
valid_fns = np.loadtxt(path/'valid.txt', dtype=str)
valid_fns[:5]
valid_fns = [path_img/o for o in valid_fns]
train_fns = list(set(fnames)-set(valid_fns))
y_train_fns = [get_y_fn(o) for o in train_fns]
y_valid_fns = [get_y_fn(o) for o in valid_fns]
len(train_fns),len(valid_fns),len(y_train_fns),len(y_valid_fns)
size=128
bs=32
train_ds = SegmentationDataset(train_fns, y_train_fns, classes=codes)
valid_ds = SegmentationDataset(valid_fns, y_valid_fns, classes=codes)
train_tfms,valid_tfms = get_transforms()
train_tds = DatasetTfm(train_ds, train_tfms, size=size, tfm_y=True)
valid_tds = DatasetTfm(valid_ds, valid_tfms, size=size, tfm_y=True)
data = DataBunch.create(train_tds, valid_tds, bs=bs)
#export
class ItemList():
"A collection of items with `__len__` and `__getitem__` with `ndarray` indexing semantics"
def __init__(self, items:Iterator): self.items = np.array(list(items))
def __len__(self)->int: return len(self.items)
def __getitem__(self,i:int)->Any: self.items[i]
def __repr__(self)->str: return f'{self.__class__.__name__} ({len(self)} items)\n{self.items}'
class PathItemList(ItemList):
def __init__(self, items:Iterator, path:PathOrStr='.'):
super().__init__(items)
self.path = Path(path)
def __repr__(self)->str: return f'{super().__repr__()}\nPath: {self.path}'
def join_path(fname:PathOrStr, path:PathOrStr='.')->Path:
"`Path(path)/Path(fname)`, `path` defaults to current dir"
return Path(path)/Path(fname)
def join_paths(fnames:FilePathList, path:PathOrStr='.')->Collection[Path]:
path = Path(path)
return [join_path(o,path) for o in fnames]
def loadtxt_str(path:PathOrStr)->np.ndarray:
"Return `ndarray` of `str` of lines of text from `path`"
return np.loadtxt(str(path), str)
#export
def _df_to_fns_labels(df:pd.DataFrame, fn_col:int=0, label_col:int=1,
label_delim:str=None, suffix:Optional[str]=None):
"""Get image file names in `fn_col` by adding `suffix` and labels in `label_col` from `df`.
If `label_delim` is specified, splits the values in `label_col` accordingly.
"""
if label_delim:
df.iloc[:,label_col] = list(csv.reader(df.iloc[:,label_col], delimiter=label_delim))
labels = df.iloc[:,label_col].values
fnames = df.iloc[:,fn_col].str.lstrip()
if suffix: fnames = fnames + suffix
return fnames.values, labels
#export
class ImageFileList(PathItemList):
@classmethod
def from_folder(cls, path:PathOrStr='.', check_ext:bool=True, recurse=True)->'ImageFileList':
return cls(get_image_files(path, check_ext=check_ext, recurse=recurse), path)
def label_from_func(self, func:Callable)->Collection:
return LabelList([(o,func(o)) for o in self.items], self.path)
def label_from_re(self, pat:str, full_path:bool=False)->Collection:
pat = re.compile(pat)
def _inner(o):
s = str(o if full_path else o.name)
res = pat.search(s)
assert res,f'Failed to find "{pat}" in "{s}"'
return res.group(1)
return self.label_from_func(_inner)
def label_from_df(self, df, fn_col:int=0, label_col:int=1, sep:str=None, folder:PathOrStr='.',
suffix:str=None)->Collection:
fnames, labels = _df_to_fns_labels(df, fn_col, label_col, sep, suffix)
fnames = join_paths(fnames, self.path/Path(folder))
return LabelList([(fn, np.array(lbl, dtype=np.object)) for fn, lbl in zip(fnames, labels) if fn in self.items],
self.path)
def label_from_csv(self, csv_fname, header:Optional[Union[int,str]]='infer', fn_col:int=0, label_col:int=1,
sep:str=None, folder:PathOrStr='.', suffix:str=None)->Collection:
df = pd.read_csv(self.path/csv_fname, header=header)
return self.label_from_df(df, fn_col, label_col, sep, folder, suffix)
def label_from_folder(self, classes:Collection[str]=None)->Collection:
labels = [fn.parent.parts[-1] for fn in self.items]
if classes is None: classes = uniqueify(labels)
return LabelList([(o,lbl) for o, lbl in zip(self.items, labels) if lbl in classes], self.path)
#export
class LabelList(PathItemList):
@property
def files(self): return self.items[:,0]
def split_by_files(self, valid_fnames:FilePathList)->'SplitData':
valid = [o for o in self.items if o[0] in valid_fnames]
train = [o for o in self.items if o[0] not in valid_fnames]
return SplitData(self.path, LabelList(train), LabelList(valid))
def split_by_fname_file(self, fname:PathOrStr, path:PathOrStr=None)->'SplitData':
path = Path(ifnone(path, self.path))
fnames = join_paths(loadtxt_str(self.path/fname), path)
return self.split_by_files(fnames)
def split_by_idx(self, valid_idx:Collection[int])->'SplitData':
valid = [o for i,o in enumerate(self.items) if i in valid_idx]
train = [o for i,o in enumerate(self.items) if i not in valid_idx]
return SplitData(self.path, LabelList(train), LabelList(valid))
def split_by_folder(self, train:str='train', valid:str='valid')->'SplitData':
n = len(self.path.parts)
folder_name = [o[0].parent.parts[n] for o in self.items]
valid = [o for o in self.items if o[0].parent.parts[n] == valid]
train = [o for o in self.items if o[0].parent.parts[n] == train]
return SplitData(self.path, LabelList(train), LabelList(valid))
def random_split_by_pct(self, valid_pct:float=0.2)->'SplitData':
rand_idx = np.random.permutation(range(len(self.items)))
cut = int(valid_pct * len(self.items))
return self.split_by_idx(rand_idx[:cut])
#export
@dataclass
class SplitData():
path:PathOrStr
train:LabelList
valid:LabelList
test: LabelList=None
def __post_init__(self): self.path = Path(self.path)
@property
def lists(self):
res = [self.train,self.valid]
if self.test is not None: res.append(self.test)
return res
def datasets(self, dataset_cls:type, **kwargs)->'SplitDatasets':
"Create datasets from the underlying data using `dataset_cls` and passing along the `kwargs`."
train = dataset_cls(*self.train.items.T, **kwargs)
dss = [train]
dss += [train.new(*o.items.T, **kwargs) for o in self.lists[1:]]
cls = getattr(train, '__splits_class__', SplitDatasets)
return cls(self.path, *dss)
#export
@dataclass
class SplitDatasets():
path:PathOrStr
train_ds:Dataset
valid_ds:Dataset
test_ds:Optional[Dataset] = None
@property
def datasets(self): return [self.train_ds,self.valid_ds]
def transform(self, tfms:TfmList, **kwargs)->'SplitDatasets':
assert not isinstance(self.train_ds, DatasetTfm)
self.train_ds = DatasetTfm(self.train_ds, tfms[0], **kwargs)
self.valid_ds = DatasetTfm(self.valid_ds, tfms[1], **kwargs)
if self.test_ds is not None:
self.test_ds = DatasetTfm(self.test_ds, tfms[1], **kwargs)
return self
def dataloaders(self, **kwargs):
return [DataLoader(o, **kwargs) for o in self.datasets]
def databunch(self, path=None, **kwargs):
path = Path(ifnone(path, self.path))
return ImageDataBunch.create(*self.datasets, path=path, **kwargs)
tfms = get_transforms()
ifl = ImageFileList.from_folder(path_img); ifl
ll = ifl.label_from_func(get_y_fn); ll
sd = ll.split_by_fname_file('../valid.txt')
tfms = get_transforms()
dss = sd.datasets(SegmentationDataset, classes=codes)
dss.train_ds.classes
tdss = dss.transform(tfms, size=128, tfm_y=True)
data = tdss.databunch()
data = (ImageFileList.from_folder(path_img)
.label_from_func(get_y_fn)
.split_by_fname_file('../valid.txt')
.datasets(SegmentationDataset, classes=codes)
.transform(tfms, size=128, tfm_y=True)
.databunch())
x,y = data.train_dl.one_batch()
show_xy_images(x,y,rows=3)