from fastai.torch_core import *
from fastai.basic_data import *
from fastai.datasets import *
from fastai.text.learner import RNNLearner
from fastai.text.transform import PAD, UNK, FLD, Tokenizer
from concurrent.futures import ProcessPoolExecutor, as_completed
def save_texts(fname:PathOrStr, texts:Collection[str]):
with open(fname, 'w') as f:
for t in texts: f.write(f'{t}\n')
class Vocab():
"Contain the correspondance between numbers and tokens and numericalize."
def __init__(self, itos:Dict[int,str]):
self.itos = itos
self.stoi = collections.defaultdict(int,{v:k for k,v in enumerate(self.itos)})
def numericalize(self, t:Collection[str]) -> List[int]:
"Convert a list of tokens `t` to their ids."
return [self.stoi[w] for w in t]
def textify(self, nums:Collection[int]) -> List[str]:
"Convert a list of `nums` to their tokens."
return '_'.join([self.itos[i] for i in nums])
@classmethod
def create(cls, tokens:Tokens, max_vocab:int, min_freq:int) -> 'Vocab':
"Create a vocabulary from a set of tokens."
freq = Counter(p for o in tokens for p in o)
itos = [o for o,c in freq.most_common(max_vocab) if c > min_freq]
itos.insert(0, PAD)
if UNK in itos: itos.remove(UNK)
itos.insert(0, UNK)
return cls(itos)
class TextBase(LabelDataset):
def __init__(self, x:Collection[Any], labels:Collection[Union[int,float]]=None, classes:Collection[Any]=None):
super().__init__(classes=classes)
self.x = np.array(x)
self.y = np.zeros(len(x)) if labels is None else np.array(labels)
text_exensions = ['.txt']
class NumericalizedDataset(TextBase):
"To directly create a text datasets from `ids` and `labels`."
def __init__(self, vocab:Vocab, ids:Collection[Collection[int]], labels:Collection[Union[int,float]]=None,
classes:Collection[Any]=None):
super().__init__(ids, labels, classes)
self.vocab, self.vocab_size = vocab, len(vocab.itos)
self.loss_func = F.cross_entropy if len(self.y.shape) <= 1 else F.binary_cross_entropy_with_logits
def get_text_item(self, idx):
return self.vocab.textify(self.x[idx]), self.classes[self.y[idx]]
def save(self, path:Path, name:str):
os.makedirs(path, exist_ok=True)
np.save(path/f'{name}_ids.npy', self.x)
np.save(path/f'{name}_lbl.npy', self.y)
pickle.dump(self.vocab.itos, open(path/'itos.pkl', 'wb'))
save_texts(path/'classes.txt', self.classes)
@classmethod
def load(cls, path:Path, name:str):
vocab = Vocab(pickle.load(open(path/f'itos.pkl', 'rb')))
x,y = np.load(path/f'{name}_ids.npy'), np.load(path/f'{name}_lbl.npy')
classes = loadtxt_str(path/'classes.txt')
return cls(vocab, x, y, classes)
class TokenizedDataset(TextBase):
def __init__(self, tokens:Collection[Collection[str]], labels:Collection[Union[int,float]]=None,
classes:Collection[Any]=None):
super().__init__(tokens, labels, classes)
def save(self, path:Path, name:str):
os.makedirs(path, exist_ok=True)
np.save(path/f'name_tok.npy', self.x)
np.save(path/f'name_lbl.npy', self.y)
np.savetxt(path/'classes.txt', self.classes.as_type(str))
def numericalize(self, vocab:Vocab=None, max_vocab:int=60000, min_freq:int=2):
vocab = ifnone(vocab, Vocab.create(self.x, max_vocab, min_freq))
ids = np.array([vocab.numericalize(t) for t in self.x])
return NumericalizedDataset(vocab, ids, self.y, self.classes)
class TextDataset(TextBase):
"Basic dataset for NLP tasks."
def __init__(self, texts:Collection[str], labels:Collection[Union[int,float]]=None,
classes:Collection[Any]=None):
super().__init__(texts, labels, classes)
@classmethod
def from_df(cls, df:DataFrame, classes:Collection[Any]=None, n_labels:int=1, txt_cols:Collection[Union[int,str]]=None,
label_cols:Collection[Union[int,str]]=None, mark_fields:bool=True) -> 'TextDataset':
"Create a `TextDataset` from the texts in a dataframe"
label_cols = ifnone(label_cols, list(range(n_labels)))
if classes is None:
if len(label_cols) == 0: classes = [0]
elif len(label_cols) == 1: classes = df[0].unique()
else: classes = label_cols
lbl_type = np.float32 if len(label_cols) > 1 else np.int64
lbls = df[label_cols].values.astype(lbl_type) if (len(label_cols) > 0) else [0] * len(df)
txt_cols = ifnone(txt_cols, list(range(len(label_cols),len(df.columns))))
texts = f'{FLD} {1} ' + df[txt_cols[0]].astype(str) if mark_fields else df[txt_cols[0]].astype(str)
for i, col in enumerate(txt_cols[1:]):
texts += (f' {FLD} {i+2} ' if mark_fields else ' ') + df[col].astype(str)
return cls(texts.values, np.squeeze(lbls), classes)
@staticmethod
def _folder_files(folder:Path, label:str, extensions:Collection[str]=text_exensions)->Tuple[str,str]:
"From `folder` return texts in files and labels. The labels are all `label`."
fnames = get_files(folder, extensions='.txt')
texts = []
for f in fnames:
with open(f,'r') as f: texts.append(f.readlines())
return texts,[label]*len(texts)
@classmethod
def from_folder(cls, path:PathOrStr, classes:Collection[Any]=None,
extensions:Collection[str]=text_exensions) -> 'TextDataset':
"Create a `TextDataset` from the text files in a folder."
path = Path(path)
classes = ifnone(classes, [cls.name for cls in find_classes(path)])
texts, labels = [], []
for cl in classes:
t,l = self._folder_files(path/cl, cl, extensions=extensions)
fexts+=t; labels+=l
keep[cl] = len(t)
classes = [cl for cl in classes if keep[cl]]
return cls(texts, labels, classes)
@classmethod
def from_one_folder(cls, path:PathOrStr, classes:Collection[Any], shuffle:bool=True,
extensions:Collection[str]=text_exensions) -> 'TextDataset':
"Create a dataset from one folder, labelled `classes[0]` (used for the test set)."
path = Path(path)
text,labels = self._folder_files(path, classes[0], extensions=extensions)
return cls(texts, labels, classes)
def tokenize(self, tokenizer:Tokenizer=None, chunksize:int=10000):
tokenizer = ifnone(tokenizer, Tokenizer())
tokens = []
for i in progress_bar(range(0,len(self.x),chunksize), leave=False):
tokens += tokenizer.process_all(self.x[i:i+chunksize])
return TokenizedDataset(tokens, self.y, self.classes)
path = untar_data(URLs.IMDB_SAMPLE)
df = pd.read_csv(path/'train.csv', header=None)
ds = TextDataset.from_df(df, classes=['negative', 'positive'])
train_ds = ds.tokenize().numericalize()
train_ds.get_text_item(0)
train_ds.save(path/'tmp', 'train')
train_ds = NumericalizedDataset.load(path/'tmp', 'train')
train_ds.get_text_item(1)
class LanguageModelLoader():
"Create a dataloader with bptt slightly changing."
def __init__(self, dataset:TextDataset, bs:int=64, bptt:int=70, backwards:bool=False):
self.dataset,self.bs,self.bptt,self.backwards = dataset,bs,bptt,backwards
self.data = self.batchify(np.concatenate(dataset.x))
self.first,self.i,self.iter = True,0,0
self.n = len(self.data)
self.num_workers = 0
def __iter__(self):
if getattr(self.dataset, 'item', None) is not None:
yield LongTensor(getattr(self.dataset, 'item')).unsqueeze(1),LongTensor([0])
self.i,self.iter = 0,0
while self.i < self.n-1 and self.iter<len(self):
if self.first and self.i == 0: self.first,seq_len = False,self.bptt + 25
else:
bptt = self.bptt if np.random.random() < 0.95 else self.bptt / 2.
seq_len = max(5, int(np.random.normal(bptt, 5)))
res = self.get_batch(self.i, seq_len)
self.i += seq_len
self.iter += 1
yield res
def __len__(self) -> int: return (self.n-1) // self.bptt
def batchify(self, data:np.ndarray) -> LongTensor:
"Split the corpus `data` in batches."
nb = data.shape[0] // self.bs
data = np.array(data[:nb*self.bs]).reshape(self.bs, -1).T
if self.backwards: data=data[::-1].copy()
return LongTensor(data)
def get_batch(self, i:int, seq_len:int) -> Tuple[LongTensor, LongTensor]:
"Create a batch at `i` of a given `seq_len`."
seq_len = min(seq_len, len(self.data) - 1 - i)
return self.data[i:i+seq_len], self.data[i+1:i+1+seq_len].contiguous().view(-1)
class SortSampler(Sampler):
"Go through the text data by order of length."
def __init__(self, data_source:NPArrayList, key:KeyFunc): self.data_source,self.key = data_source,key
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
return iter(sorted(range_of(self.data_source), key=self.key, reverse=True))
class SortishSampler(Sampler):
"Go through the text data by order of length with a bit of randomness."
def __init__(self, data_source:NPArrayList, key:KeyFunc, bs:int):
self.data_source,self.key,self.bs = data_source,key,bs
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
idxs = np.random.permutation(len(self.data_source))
sz = self.bs*50
ck_idx = [idxs[i:i+sz] for i in range(0, len(idxs), sz)]
sort_idx = np.concatenate([sorted(s, key=self.key, reverse=True) for s in ck_idx])
sz = self.bs
ck_idx = [sort_idx[i:i+sz] for i in range(0, len(sort_idx), sz)]
max_ck = np.argmax([self.key(ck[0]) for ck in ck_idx]) # find the chunk with the largest key,
ck_idx[0],ck_idx[max_ck] = ck_idx[max_ck],ck_idx[0] # then make sure it goes first.
sort_idx = np.concatenate(np.random.permutation(ck_idx[1:]))
sort_idx = np.concatenate((ck_idx[0], sort_idx))
return iter(sort_idx)
def pad_collate(samples:BatchSamples, pad_idx:int=1, pad_first:bool=True) -> Tuple[LongTensor, LongTensor]:
"Function that collect samples and adds padding."
max_len = max([len(s[0]) for s in samples])
res = torch.zeros(max_len, len(samples)).long() + pad_idx
for i,s in enumerate(samples):
if pad_first: res[-len(s[0]):,i] = LongTensor(s[0])
else: res[:len(s[0]):,i] = LongTensor(s[0])
return res, tensor([s[1] for s in samples])
def _parse_kwargs(kwargs):
txt_kwargs, kwargs = extract_kwargs(['n_labels', 'txt_cols', 'label_cols'], kwargs)
tok_kwargs, kwargs = extract_kwargs(['chunksize'], kwargs)
num_kwargs, kwargs = extract_kwargs(['max_vocab', 'min_freq'], kwargs)
return txt_kwargs, tok_kwargs, num_kwargs, kwargs
class TextDataBunch(DataBunch):
def save(self, cache_name:str='tmp'):
cache_path = self.path/cache_name
pickle.dump(self.train_ds.vocab.itos, open(cache_path/f'itos.pkl', 'wb'))
np.save(cache_path/f'train_ids.npy', self.train_ds.x)
np.save(cache_path/f'train_lbl.npy', self.train_ds.y)
np.save(cache_path/f'valid_ids.npy', self.valid_ds.x)
np.save(cache_path/f'valid_lbl.npy', self.valid_ds.y)
if self.test_dl is not None: np.save(cache_path/f'test_ids.npy', self.test_ds.x)
save_texts(cache_path/'classes.txt', self.train_ds.classes)
@classmethod
def from_ids(cls, path:PathOrStr, vocab:Vocab, trn_ids:Collection[Collection[int]], val_ids:Collection[Collection[int]],
tst_ids:Collection[Collection[int]]=None, trn_lbls:Collection[Union[int,float]]=None,
val_lbls:Collection[Union[int,float]]=None, classes:Collection[Any]=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from ids, labels and a dictionary."
train_ds = NumericalizedDataset(vocab, trn_ids, trn_lbls, classes)
datasets = [train_ds, NumericalizedDataset(vocab, val_ids, val_lbls, classes)]
if tst_ids is not None: datasets.append(NumericalizedDataset(vocab, tst_ids, None, classes))
return cls.create(datasets, path, **kwargs)
@classmethod
def load(cls, path:PathOrStr, **kwargs):
cache_path = Path(path)/'tmp'
vocab = Vocab(pickle.load(open(cache_path/f'itos.pkl', 'rb')))
trn_ids,trn_lbls = np.load(cache_path/f'train_ids.npy'), np.load(cache_path/f'train_lbl.npy')
val_ids,val_lbls = np.load(cache_path/f'valid_ids.npy'), np.load(cache_path/f'valid_lbl.npy')
tst_ids = np.load(cache_path/f'test_ids.npy') if os.path.isfile(cache_path/f'test_ids.npy') else None
classes = loadtxt_str(cache_path/'classes.txt')
return cls.from_ids(path, vocab, trn_ids, val_ids, tst_ids, trn_lbls, val_lbls, classes, **kwargs)
@classmethod
def from_tokens(cls, path:PathOrStr, trn_tok:Collection[Collection[str]], trn_lbls:Collection[Union[int,float]],
val_tok:Collection[Collection[str]], val_lbls:Collection[Union[int,float]], vocab:Vocab=None,
tst_tok:Collection[Collection[str]]=None, classes:Collection[Any]=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from tokens and labels."
num_kwargs, kwargs = extract_kwargs(['max_vocab', 'min_freq'], kwargs)
train_ds = TokenizedDataset(trn_tok, trn_lbls, classes).numericalize(vocab, **num_kwargs)
datasets = [train_ds, TokenizedDataset(val_tok, val_lbls, classes).numericalize(vocab)]
if test: datasets.append(TokenizedDataset(tst_tok, [0]*len(tst_tok), classes).numericalize(vocab))
return cls.create(datasets, path, **kwargs)
@classmethod
def from_df(cls, path:PathOrStr, train_df:DataFrame, valid_df:DataFrame, test_df:Optional[DataFrame]=None,
tokenizer:Tokenizer=None, vocab:Vocab=None, classes:Collection[str]=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from DataFrames."
txt_kwargs, tok_kwargs, num_kwargs, kwargs = _parse_kwargs(kwargs)
datasets = [(TextDataset.from_df(train_df, classes, **txt_kwargs)
.tokenize(tokenizer, **tok_kwargs)
.numericalize(vocab, **num_kwargs))]
dfs = [valid_df] if test_df is None else [valid_df, test_df]
for df in dfs:
datasets.append((TextDataset.from_df(df, classes, **txt_kwargs)
.tokenize(tokenizer, **tok_kwargs)
.numericalize(datasets[0].vocab, **num_kwargs)))
return cls.create(datasets, path, **kwargs)
@classmethod
def from_csv(cls, path:PathOrStr, train:str='train', valid:str='valid', test:Optional[str]=None,
tokenizer:Tokenizer=None, vocab:Vocab=None, classes:Collection[str]=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from texts in csv files."
header = 'infer' if 'txt_cols' in kwargs else None
train_df = pd.read_csv(os.path.join(path, train+'.csv'), header=header)
valid_df = pd.read_csv(os.path.join(path, valid+'.csv'), header=header)
test_df = None if test is None else pd.read_csv(os.path.join(path, test+'.csv'), header=header)
return cls.from_df(path, train_df, valid_df, test_df, tokenizer, vocab, classes, **kwargs)
@classmethod
def from_folder(cls, path:PathOrStr, train:str='train', valid:str='valid', test:Optional[str]=None,
tokenizer:Tokenizer=None, vocab:Vocab=None, **kwargs):
"Create a `TextDataBunch` from text files in folders."
txt_kwargs, tok_kwargs, num_kwargs, kwargs = _parse_kwargs(kwargs)
train_ds = (TextDataset.from_folder(train, classes, **txt_kwargs)
.tokenize(tokenizer, **tok_kwargs)
.numericalize(vocab, **num_kwargs))
datasets = [train_ds, (TextDataset.from_folder(valid, train_ds.classes, **txt_kwargs)
.tokenize(tokenizer, **tok_kwargs)
.numericalize(train_ds.vocab, **num_kwargs))]
if test:
datasets.append((TextDataset.from_one_folder(valid, train_ds.classes, **txt_kwargs)
.tokenize(tokenizer, **tok_kwargs)
.numericalize(train_ds.vocab, **num_kwargs)))
return cls.create(datasets, path, **kwargs)
@classmethod
def create(cls, datasets:Collection[TextDataset], path:PathOrStr, **kwargs) -> DataBunch:
"Call's `DataBunch.create` but changes the arguments so it'll work OK"
return DataBunch.create(*datasets, path=path, **kwargs)
class TextLMDataBunch(TextDataBunch):
"Create a `TextDataBunch` suitable for training a language model."
@classmethod
def create(cls, datasets:Collection[TextDataset], path:PathOrStr, **kwargs) -> DataBunch:
"Create a `TextDataBunch` in `path` from the `datasets` for language modelling."
dataloaders = [LanguageModelLoader(ds, **kwargs) for ds in datasets]
return cls(*dataloaders, path=path)
class TextClasDataBunch(TextDataBunch):
"Create a `TextDataBunch` suitable for training an RNN classifier."
@classmethod
def create(cls, datasets:Collection[TextDataset], path:PathOrStr, bs=64, pad_idx=1, pad_first=True, **kwargs) -> DataBunch:
"Function that transform the `datasets` in a `DataBunch` for classification."
collate_fn = partial(pad_collate, pad_idx=pad_idx, pad_first=pad_first)
train_sampler = SortishSampler(datasets[0].x, key=lambda t: len(datasets[0].x[t]), bs=bs//2)
train_dl = DataLoader(datasets[0], batch_size=bs//2, sampler=train_sampler, **kwargs)
dataloaders = [train_dl]
for ds in datasets[1:]:
sampler = SortSampler(ds.x, key=lambda t: len(ds.x[t]))
dataloaders.append(DataLoader(ds, batch_size=bs, sampler=sampler, **kwargs))
return cls(*dataloaders, path=path, collate_fn=collate_fn)
data = TextLMDataBunch.from_csv(path, classes=['negative', 'positive'])
data.save()
learn = RNNLearner.language_model(data, pretrained_model=URLs.WT103)
example_text = "I would like to know which word comes after this sentence"
class LanguageLearner(RNNLearner):
def predict(self, text:str, n_words:int=1, tokenizer:Tokenizer=None):
"Return the `n_words` that come after `text`."
tokenizer = ifnone(tokenizer, Tokenizer())
tokens = tokenizer.process_all([text])
ds = self.data.valid_ds
ids = ds.vocab.numericalize(tokens[0])
self.model.reset()
for _ in progress_bar(range(n_words)):
ds.set_item(ids)
res = self.pred_batch()
ids.append(res[-1].argmax())
ds.clear_item()
return self.data.train_ds.vocab.textify(ids)
learn = LanguageLearner.language_model(data, pretrained_model=URLs.WT103)
learn.predict("Jeremy Howard is", 100)
class TextClassifierLearner(RNNLearner):
def predict(self, text:str, tokenizer:Tokenizer=None):
"Return prect class, label and probabilities for `text`."
tokenizer = ifnone(tokenizer, Tokenizer())
tokens = tokenizer.process_all([text])
ds = self.data.valid_ds
ids = ds.vocab.numericalize(tokens[0])
self.model.reset()
ds.set_item(ids)
res = self.pred_batch()[0]
ds.clear_item()
pred_max = res.argmax()
return self.data.train_ds.classes[pred_max],pred_max,res
from fastai.text.learner import get_rnn_classifier, rnn_classifier_split
def text_classifier(data:DataBunch, bptt:int=70, max_len:int=70*20, emb_sz:int=400, nh:int=1150, nl:int=3,
lin_ftrs:Collection[int]=None, ps:Collection[float]=None, pad_token:int=1,
drop_mult:float=1., qrnn:bool=False, **kwargs) -> 'RNNLearner':
"Create a RNN classifier."
dps = np.array([0.4,0.5,0.05,0.3,0.4]) * drop_mult
if lin_ftrs is None: lin_ftrs = [50]
if ps is None: ps = [0.1]
ds = data.train_ds
vocab_size, lbl = ds.vocab_size, ds.y[0]
n_class = (len(ds.classes) if (not isinstance(lbl, Iterable) or (len(lbl) == 1)) else len(lbl))
layers = [emb_sz*3] + lin_ftrs + [n_class]
ps = [dps[4]] + ps
model = get_rnn_classifier(bptt, max_len, n_class, vocab_size, emb_sz, nh, nl, pad_token,
layers, ps, input_p=dps[0], weight_p=dps[1], embed_p=dps[2], hidden_p=dps[3], qrnn=qrnn)
learn = TextClassifierLearner(data, model, bptt, split_func=rnn_classifier_split, **kwargs)
return learn
data = TextClasDataBunch.load(path)
learn = text_classifier(data)
example_text = "I really liked that movie, it was just the best I ever saw!"
learn.predict(example_text)