from nb_200 import *
path = untar_data(URLs.IMDB)
# export
from multiprocessing import Process, Queue
import spacy,html
from spacy.symbols import ORTH
from fastprogress import progress_bar,master_bar
import pickle,random
Before even tokenizing, we will apply a bit of preprocessing on the texts to clean them up (we saw the one up there had some HTML code). These rules are applied before we split the sentences in tokens.
#export
#special tokens
UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ = "xxunk xxpad xxbos xxeos xxfld xxrep xxwrep xxup xxmaj".split()
def sub_br(t):
"Replaces the <br /> by \n"
re_br = re.compile(r'<\s*br\s*/?>', re.IGNORECASE)
return re_br.sub("\n", t)
def spec_add_spaces(t):
"Add spaces around / and #"
return re.sub(r'([/#])', r' \1 ', t)
def rm_useless_spaces(t):
"Remove multiple spaces"
return re.sub(' {2,}', ' ', t)
def replace_rep(t):
"Replace repetitions at the character level: cccc -> TK_REP 4 c"
def _replace_rep(m:Collection[str]) -> str:
c,cc = m.groups()
return f' {TK_REP} {len(cc)+1} {c} '
re_rep = re.compile(r'(\S)(\1{3,})')
return re_rep.sub(_replace_rep, t)
def replace_wrep(t):
"Replace word repetitions: word word word -> TK_WREP 3 word"
def _replace_wrep(m:Collection[str]) -> str:
c,cc = m.groups()
return f' {TK_WREP} {len(cc.split())+1} {c} '
re_wrep = re.compile(r'(\b\w+\W+)(\1{3,})')
return re_wrep.sub(_replace_wrep, t)
def fixup_text(x):
"Various messy things we've seen in documents"
re1 = re.compile(r' +')
x = x.replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace(
'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace(
'<br />', "\n").replace('\\"', '"').replace('<unk>',UNK).replace(' @.@ ','.').replace(
' @-@ ','-').replace('\\', ' \\ ')
return re1.sub(' ', html.unescape(x))
default_pre_rules = [fixup_text, replace_rep, replace_wrep, spec_add_spaces, rm_useless_spaces, sub_br]
default_spec_tok = [UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ]
replace_rep('cccc')
replace_wrep('word word word word word ')
These rules are applies after the tokenization on the list of tokens.
#export
def replace_all_caps(x):
"Replace tokens in ALL CAPS by their lower version and add `TK_UP` before."
res = []
for t in x:
if t.isupper() and len(t) > 1: res.append(TK_UP); res.append(t.lower())
else: res.append(t)
return res
def deal_caps(x):
"Replace all Capitalized tokens in by their lower version and add `TK_MAJ` before."
res = []
for t in x:
if t == '': continue
if t[0].isupper() and len(t) > 1 and t[1:].islower(): res.append(TK_MAJ)
res.append(t.lower())
return res
def add_eos_bos(x): return [BOS] + x + [EOS]
default_post_rules = [deal_caps, replace_all_caps, add_eos_bos]
replace_all_caps(['I', 'AM', 'SHOUTING'])
deal_caps(['My', 'name', 'is', 'Jeremy'])
A Tokenizer should implement two methods: init with a certain language and some special tokens, then tokenize_pipe which returns a generator that yields the tokenized texts (should take a generator). chunksize is used for some tokenizers like spacy that can treat items as batches.
class BaseTokenizer():
def __init__(self, lang, special_toks): pass
def pipe(self, items):
for t in items: yield t.split(' ')
class SpacyTokenizer():
def __init__(self, lang='en', special_toks=None, batch_size=5000):
special_toks = ifnone(special_toks, default_spec_tok)
self.nlp = spacy.blank(lang, disable=["parser", "tagger", "ner"])
for w in default_spec_tok: self.nlp.tokenizer.add_special_case(w, [{ORTH: w}])
self.batch_size=batch_size
def pipe(self, items):
for doc in self.nlp.pipe(items, batch_size=self.batch_size):
yield [d.text for d in doc]
def apply_rules(items, rules):
for o in items: yield apply_all(o, rules)
def tokenize1(text, tok_func=SpacyTokenizer, pre_rules=None, post_rules=None, **tok_kwargs):
pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy()))
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
tokenizer = tok_func(**tok_kwargs)
for tok in tokenizer.pipe(apply_rules([text], pre_rules)):
tok = apply_all(tok, post_rules)
return tok
Returns a generator from items after applying rules to them.
A basic function that reads the content of file.
def read_text(fname):
with open(fname, 'r') as f: return f.read()
The main function that will be called during tokenization. It will create an instance of a tokenizer with tok_func and tok_kwargs, then iterate through the items, apply them pre_rules, tokenize them, apply them post_rules, then apply output_func to the original item and the tokens and put the result in output_queue.
If a data_queue is passed, we count the different tokens and return the Counter in it at the end.
def tok_items(items, tok_func, pre_rules, post_rules, output_func, output_queue, data_queue=None, **tok_kwargs):
tokenizer = tok_func(**tok_kwargs)
if data_queue: counts = Counter()
for i,tok in enumerate(tokenizer.pipe(apply_rules(items, pre_rules))):
tok = apply_all(tok, post_rules)
output_queue.put(output_func(items[i], tok))
if data_queue: counts.update(Counter(tok))
if data_queue: data_queue.put(counts)
Helper function to create the same directory structure as in a given folder.
def create_folders(path, output_dir, include=None):
output_dir = Path(output_dir)
os.makedirs(output_dir, exist_ok=True)
for i,(p,d,f) in enumerate(os.walk(path)): # returns (dirpath, dirnames, filenames)
if include is not None and i==0: d[:] = [o for o in d if o in include]
else: d[:] = [o for o in d if not o.startswith('.')]
for x in d: os.makedirs(output_dir/(Path(p)/Path(x)).relative_to(path), exist_ok=True)
Preprocessing function for texts in filenames. Tokenized texts will be saved in a similar fashion in a directory suffixed with _tok in the parent folder of path (override with output_dir).
SEP = '▁'
fname = path/'labels.csv'
fname.suffix
def tok_folder(path, extensions=['.txt'], include=None, output_dir=None, n_workers=4,
pre_rules=None, post_rules=None, tok_func=SpacyTokenizer, **tok_kwargs):
path = Path(path)
fnames = get_files(path, extensions=extensions, recurse=True, include=include)
output_dir = Path(ifnone(output_dir, path.parent/f'{path.name}_tok'))
create_folders(path, output_dir, include=include)
pre_rules = [read_text] + listify(ifnone(pre_rules, default_pre_rules.copy()))
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers)
def _output(o, tok):
out = output_dir/o.relative_to(path)
with open(out, 'w') as f: f.write(SEP.join(tok))
with open(out.parent/f'{out.stem}.len', 'w') as f: f.write(str(len(tok)))
return 1
processes = [Process(target=tok_items,
args=(batch, tok_func, pre_rules, post_rules, _output, output_queue),
kwargs={'data_queue': data_queue, **tok_kwargs})
for i,batch in enumerate(np.array_split(fnames, n_workers))]
for p in processes: p.start()
counter = Counter()
for _ in progress_bar(fnames, leave=False): _ = output_queue.get()
for _ in processes: counter.update(data_queue.get())
for p in processes: p.join()
pickle.dump(counter, open(output_dir/'counter.pkl','wb'))
path = untar_data(URLs.IMDB)
# test
fnames = get_files(path, extensions=['.txt'], recurse=True, include=['train', 'test', 'unsup'])
tok_path = path.parent/'imdb_tok'
assert tok_path.exists()
#Take one file randomly
idx = random.randint(0, len(fnames)-1)
#Check we have the corresponding tokenized version...
tok_fname = tok_path/(fnames[idx].relative_to(path))
assert tok_fname.exists()
text = read_text(fnames[idx])
tok = tokenize1(text)
assert SEP.join(tok) == read_text(tok_fname)
len_fname = tok_fname.parent/f'{tok_fname.stem}.len'
assert len(tok) == int(read_text(len_fname))
When text is in a dataframe, we need to merge the text columns, and maybe mark_fields.
def join_texts(idx, df, mark_fields=False):
return ' '.join([(f'{FLD} {i} ' if mark_fields else '') + t for i,t in enumerate(df.iloc[int(idx)].values)])
Preprocessing function for texts in a dataframe. Tokenized texts will be put in a similar dataframe with just one column of texts and the other columns the same.
def tok_df(df, text_cols, n_workers=4, pre_rules=None, post_rules=None, mark_fields=None,
tok_func=SpacyTokenizer, **tok_kwargs):
text_cols = listify(text_cols)
mark_fields = ifnone(mark_fields, len(listify(text_cols)) > 1)
pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy()))
pre_rules = [partial(join_texts, df=df[text_cols], mark_fields=mark_fields)] + pre_rules
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers)
def _output(o, tok): return (o,tok)
processes = [Process(target=tok_items,
args=(batch, tok_func, pre_rules, post_rules, _output, output_queue),
kwargs={'data_queue': data_queue, **tok_kwargs})
for i,batch in enumerate(np.array_split(range(len(df)), n_workers))]
for p in processes: p.start()
lengths,outputs,counter = np.zeros(len(df)),np.zeros(len(df), dtype=np.object),Counter()
for _ in progress_bar(range(len(df)), leave=False):
i,tok = output_queue.get()
lengths[i],outputs[i] = len(tok),SEP.join(tok)
for _ in processes: counter.update(data_queue.get())
for p in processes: p.join()
other_cols = [c for c in df.columns if c not in text_cols]
res = df[other_cols].copy()
res['text'],res['text_lengths'] = outputs,lengths
return res, counter
# test
path = untar_data(URLs.IMDB_SAMPLE)
df = pd.read_csv(path/'texts.csv')
out,cnt = tok_df(df, text_cols='text')
test_eq(set(out.columns),set(list(df.columns)+['text_lengths']))
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), out['text'][idx])
test_eq(len(tok), out['text_lengths'][idx])
#With two fields, mark fields become true by default
df['text1'] = df['text']
out,cnt = tok_df(df, text_cols=['text', 'text1'])
idx = random.randint(0, len(df)-1)
text = f"{FLD} 0 {df['text'][idx]} {FLD} 1 {df['text1'][idx]}"
tok = tokenize1(text)
test_eq(SEP.join(tok), out['text'][idx])
test_eq(len(tok), out['text_lengths'][idx])
def tok_csv(fname, text_cols, outname=None, n_workers=4, pre_rules=None, post_rules=None,
mark_fields=None, tok_func=SpacyTokenizer, header='infer', chunksize=None, **tok_kwargs):
df = pd.read_csv(fname, header=header, chunksize=chunksize)
outname = Path(ifnone(outname, fname.parent/f'{fname.stem}_tok.csv'))
kwargs = dict(n_workers=n_workers, pre_rules=pre_rules, post_rules=post_rules,
mark_fields=mark_fields, tok_func=tok_func, **tok_kwargs)
if chunksize is None:
out,cnt = tok_df(df, text_cols, **kwargs)
out.to_csv(outname, header=header, index=False)
else:
cnt = Counter()
for i,dfp in enumerate(df):
out,c = tok_df(dfp, text_cols, **kwargs)
out.to_csv(outname, header=header if i==0 else None, index=False, mode='w' if i==0 else 'a')
cnt.update(c)
pickle.dump(cnt, open(outname.parent/'counter.pkl', 'wb'))
#test
path = untar_data(URLs.IMDB_SAMPLE)
tok_csv(path/'texts.csv', 'text')
assert (path/'texts_tok.csv').exists()
df = pd.read_csv(path/'texts.csv')
df_tok = pd.read_csv(path/'texts_tok.csv')
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), df_tok['text'][idx])
test_eq(len(tok), df_tok['text_lengths'][idx])
#test
path = untar_data(URLs.IMDB_SAMPLE)
tok_csv(path/'texts.csv', 'text', chunksize=500)
assert (path/'texts_tok.csv').exists()
df = pd.read_csv(path/'texts.csv')
df_tok = pd.read_csv(path/'texts_tok.csv')
test_eq(len(df_tok), len(df))
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), df_tok['text'][idx])
test_eq(len(tok), df_tok['text_lengths'][idx])
import collections
class ReadTokens(Transform):
def __call__(self, o):
text = read_text(o) if isinstance(o, Path) else str(o)
return text.split(SEP)
def decode(self, o): return SEP.join(o)
def show(self, x, ax): print(x)
class Numericalize(MultiCategorize):
_order = 5
def __init__(self, vocab):
self.vocab = vocab
self.o2i = collections.defaultdict(int, {w:i for i,w in enumerate(vocab)})
class Text(Item):
tfm = [ReadTokens, Numericalize]
def text_getter(suf='', **kwargs):
def _inner(o, **kwargs):
return get_files(o/suf, extensions=['.txt'], recurse=True)
return _inner
class ImdbData(DataBlock):
types = (Text,Item)
get_items = text_getter()
split = random_splitter()
label_func = lambda fn,self: int(read_text(fn.parent/f'{fn.stem}.len'))
path = untar_data(URLs.IMDB)
path_tok = path.parent/'imdb_tok'
counter = pickle.load(open(path_tok/'counter.pkl', 'rb'))
vocab = [w for w,i in counter.most_common(60000) if i >= 2]
dblk = ImdbData(path_tok, tfms_x=[ReadTokens(), Numericalize(vocab)])
dsrc = dblk.datasource()
x,y = dsrc.get(0,0)
t = dsrc.decode((x,y))
t
class LM_PreLoader():
def __init__(self, fl, lengths=None, bs=64, bptt=70, shuffle=False):
self.fl,self.bs,self.bptt,self.shuffle = fl,bs,bptt,shuffle
self.lengths = [len(o[0]) for o in fl] if lengths is None else lengths
self.n_batch = sum(self.lengths) // bs
self.batchify()
def __len__(self): return ((self.n_batch-1) // self.bptt) * self.bs
def __getitem__(self, i):
k = (i % self.bs) * self.n_batch + (i // self.bs) * self.bptt
item_idx = (self.cumlen > k).nonzero().min().item()
offset = k if item_idx==0 else k-self.cumlen[item_idx-1]
text = self.fl[item_idx][0][offset:]
while len(text) <= self.bptt:
item_idx += 1
text += self.fl[item_idx][0]
return tensor(text[:self.bptt]),tensor(text[1:self.bptt+1])
def batchify(self):
self.idxs = torch.randperm(len(fl)) if self.shuffle else tensor(range(len(self.fl)))
self.cumlen = (tensor(self.lengths)[idxs] if self.shuffle else tensor(self.lengths)).cumsum(0)
#test
ds = LM_PreLoader(dsrc[0], lengths=lengths)
x,y = ds[0]
test_equal(x[1:], y[:-1])
x0,x1 = dsrc.get(0,0)[0],dsrc.get(1,0)[0]
test_equal(x, tensor(x0+x1)[:70])
test_equal(ds[64][0], tensor(x0+x1)[70:140])
k = ds.n_batch
x,y = ds[1]
offset = k - ds.cumlen[1262]
test_equal(x, tensor(dsrc.get(1263,0)[0][offset:offset+70]))
data = DataLoader(ds, 64, shuffle=False, num_workers=4)
%time for (x,y) in progress_bar(data): pass