from nb_200 import *
path = untar_data(URLs.IMDB)
# export
from multiprocessing import Process, Queue
import spacy,html
from spacy.symbols import ORTH
from fastprogress import progress_bar,master_bar
import pickle,random
#export
#special tokens
UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ = "xxunk xxpad xxbos xxeos xxfld xxrep xxwrep xxup xxmaj".split()
def sub_br(t):
"Replaces the
by \n"
re_br = re.compile(r'<\s*br\s*/?>', re.IGNORECASE)
return re_br.sub("\n", t)
def spec_add_spaces(t):
"Add spaces around / and #"
return re.sub(r'([/#])', r' \1 ', t)
def rm_useless_spaces(t):
"Remove multiple spaces"
return re.sub(' {2,}', ' ', t)
def replace_rep(t):
"Replace repetitions at the character level: cccc -> TK_REP 4 c"
def _replace_rep(m:Collection[str]) -> str:
c,cc = m.groups()
return f' {TK_REP} {len(cc)+1} {c} '
re_rep = re.compile(r'(\S)(\1{3,})')
return re_rep.sub(_replace_rep, t)
def replace_wrep(t):
"Replace word repetitions: word word word -> TK_WREP 3 word"
def _replace_wrep(m:Collection[str]) -> str:
c,cc = m.groups()
return f' {TK_WREP} {len(cc.split())+1} {c} '
re_wrep = re.compile(r'(\b\w+\W+)(\1{3,})')
return re_wrep.sub(_replace_wrep, t)
def fixup_text(x):
"Various messy things we've seen in documents"
re1 = re.compile(r' +')
x = x.replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace(
'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace(
'
', "\n").replace('\\"', '"').replace('',UNK).replace(' @.@ ','.').replace(
' @-@ ','-').replace('\\', ' \\ ')
return re1.sub(' ', html.unescape(x))
default_pre_rules = [fixup_text, replace_rep, replace_wrep, spec_add_spaces, rm_useless_spaces, sub_br]
default_spec_tok = [UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ]
replace_rep('cccc')
replace_wrep('word word word word word ')
#export
def replace_all_caps(x):
"Replace tokens in ALL CAPS by their lower version and add `TK_UP` before."
res = []
for t in x:
if t.isupper() and len(t) > 1: res.append(TK_UP); res.append(t.lower())
else: res.append(t)
return res
def deal_caps(x):
"Replace all Capitalized tokens in by their lower version and add `TK_MAJ` before."
res = []
for t in x:
if t == '': continue
if t[0].isupper() and len(t) > 1 and t[1:].islower(): res.append(TK_MAJ)
res.append(t.lower())
return res
def add_eos_bos(x): return [BOS] + x + [EOS]
default_post_rules = [deal_caps, replace_all_caps, add_eos_bos]
replace_all_caps(['I', 'AM', 'SHOUTING'])
deal_caps(['My', 'name', 'is', 'Jeremy'])
class BaseTokenizer():
def __init__(self, lang, special_toks): pass
def pipe(self, items):
for t in items: yield t.split(' ')
class SpacyTokenizer():
def __init__(self, lang='en', special_toks=None, batch_size=5000):
special_toks = ifnone(special_toks, default_spec_tok)
self.nlp = spacy.blank(lang, disable=["parser", "tagger", "ner"])
for w in default_spec_tok: self.nlp.tokenizer.add_special_case(w, [{ORTH: w}])
self.batch_size=batch_size
def pipe(self, items):
for doc in self.nlp.pipe(items, batch_size=self.batch_size):
yield [d.text for d in doc]
def apply_rules(items, rules):
for o in items: yield apply_all(o, rules)
def tokenize1(text, tok_func=SpacyTokenizer, pre_rules=None, post_rules=None, **tok_kwargs):
pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy()))
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
tokenizer = tok_func(**tok_kwargs)
for tok in tokenizer.pipe(apply_rules([text], pre_rules)):
tok = apply_all(tok, post_rules)
return tok
def read_text(fname):
with open(fname, 'r') as f: return f.read()
def tok_items(items, tok_func, pre_rules, post_rules, output_func, output_queue, data_queue=None, **tok_kwargs):
tokenizer = tok_func(**tok_kwargs)
if data_queue: counts = Counter()
for i,tok in enumerate(tokenizer.pipe(apply_rules(items, pre_rules))):
tok = apply_all(tok, post_rules)
output_queue.put(output_func(items[i], tok))
if data_queue: counts.update(Counter(tok))
if data_queue: data_queue.put(counts)
def create_folders(path, output_dir, include=None):
output_dir = Path(output_dir)
os.makedirs(output_dir, exist_ok=True)
for i,(p,d,f) in enumerate(os.walk(path)): # returns (dirpath, dirnames, filenames)
if include is not None and i==0: d[:] = [o for o in d if o in include]
else: d[:] = [o for o in d if not o.startswith('.')]
for x in d: os.makedirs(output_dir/(Path(p)/Path(x)).relative_to(path), exist_ok=True)
SEP = '▁'
fname = path/'labels.csv'
fname.suffix
def tok_folder(path, extensions=['.txt'], include=None, output_dir=None, n_workers=4,
pre_rules=None, post_rules=None, tok_func=SpacyTokenizer, **tok_kwargs):
path = Path(path)
fnames = get_files(path, extensions=extensions, recurse=True, include=include)
output_dir = Path(ifnone(output_dir, path.parent/f'{path.name}_tok'))
create_folders(path, output_dir, include=include)
pre_rules = [read_text] + listify(ifnone(pre_rules, default_pre_rules.copy()))
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers)
def _output(o, tok):
out = output_dir/o.relative_to(path)
with open(out, 'w') as f: f.write(SEP.join(tok))
with open(out.parent/f'{out.stem}.len', 'w') as f: f.write(str(len(tok)))
return 1
processes = [Process(target=tok_items,
args=(batch, tok_func, pre_rules, post_rules, _output, output_queue),
kwargs={'data_queue': data_queue, **tok_kwargs})
for i,batch in enumerate(np.array_split(fnames, n_workers))]
for p in processes: p.start()
counter = Counter()
for _ in progress_bar(fnames, leave=False): _ = output_queue.get()
for _ in processes: counter.update(data_queue.get())
for p in processes: p.join()
pickle.dump(counter, open(output_dir/'counter.pkl','wb'))
path = untar_data(URLs.IMDB)
# test
fnames = get_files(path, extensions=['.txt'], recurse=True, include=['train', 'test', 'unsup'])
tok_path = path.parent/'imdb_tok'
assert tok_path.exists()
#Take one file randomly
idx = random.randint(0, len(fnames)-1)
#Check we have the corresponding tokenized version...
tok_fname = tok_path/(fnames[idx].relative_to(path))
assert tok_fname.exists()
text = read_text(fnames[idx])
tok = tokenize1(text)
assert SEP.join(tok) == read_text(tok_fname)
len_fname = tok_fname.parent/f'{tok_fname.stem}.len'
assert len(tok) == int(read_text(len_fname))
def join_texts(idx, df, mark_fields=False):
return ' '.join([(f'{FLD} {i} ' if mark_fields else '') + t for i,t in enumerate(df.iloc[int(idx)].values)])
def tok_df(df, text_cols, n_workers=4, pre_rules=None, post_rules=None, mark_fields=None,
tok_func=SpacyTokenizer, **tok_kwargs):
text_cols = listify(text_cols)
mark_fields = ifnone(mark_fields, len(listify(text_cols)) > 1)
pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy()))
pre_rules = [partial(join_texts, df=df[text_cols], mark_fields=mark_fields)] + pre_rules
post_rules = listify(ifnone(post_rules, default_post_rules.copy()))
output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers)
def _output(o, tok): return (o,tok)
processes = [Process(target=tok_items,
args=(batch, tok_func, pre_rules, post_rules, _output, output_queue),
kwargs={'data_queue': data_queue, **tok_kwargs})
for i,batch in enumerate(np.array_split(range(len(df)), n_workers))]
for p in processes: p.start()
lengths,outputs,counter = np.zeros(len(df)),np.zeros(len(df), dtype=np.object),Counter()
for _ in progress_bar(range(len(df)), leave=False):
i,tok = output_queue.get()
lengths[i],outputs[i] = len(tok),SEP.join(tok)
for _ in processes: counter.update(data_queue.get())
for p in processes: p.join()
other_cols = [c for c in df.columns if c not in text_cols]
res = df[other_cols].copy()
res['text'],res['text_lengths'] = outputs,lengths
return res, counter
# test
path = untar_data(URLs.IMDB_SAMPLE)
df = pd.read_csv(path/'texts.csv')
out,cnt = tok_df(df, text_cols='text')
test_eq(set(out.columns),set(list(df.columns)+['text_lengths']))
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), out['text'][idx])
test_eq(len(tok), out['text_lengths'][idx])
#With two fields, mark fields become true by default
df['text1'] = df['text']
out,cnt = tok_df(df, text_cols=['text', 'text1'])
idx = random.randint(0, len(df)-1)
text = f"{FLD} 0 {df['text'][idx]} {FLD} 1 {df['text1'][idx]}"
tok = tokenize1(text)
test_eq(SEP.join(tok), out['text'][idx])
test_eq(len(tok), out['text_lengths'][idx])
def tok_csv(fname, text_cols, outname=None, n_workers=4, pre_rules=None, post_rules=None,
mark_fields=None, tok_func=SpacyTokenizer, header='infer', chunksize=None, **tok_kwargs):
df = pd.read_csv(fname, header=header, chunksize=chunksize)
outname = Path(ifnone(outname, fname.parent/f'{fname.stem}_tok.csv'))
kwargs = dict(n_workers=n_workers, pre_rules=pre_rules, post_rules=post_rules,
mark_fields=mark_fields, tok_func=tok_func, **tok_kwargs)
if chunksize is None:
out,cnt = tok_df(df, text_cols, **kwargs)
out.to_csv(outname, header=header, index=False)
else:
cnt = Counter()
for i,dfp in enumerate(df):
out,c = tok_df(dfp, text_cols, **kwargs)
out.to_csv(outname, header=header if i==0 else None, index=False, mode='w' if i==0 else 'a')
cnt.update(c)
pickle.dump(cnt, open(outname.parent/'counter.pkl', 'wb'))
#test
path = untar_data(URLs.IMDB_SAMPLE)
tok_csv(path/'texts.csv', 'text')
assert (path/'texts_tok.csv').exists()
df = pd.read_csv(path/'texts.csv')
df_tok = pd.read_csv(path/'texts_tok.csv')
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), df_tok['text'][idx])
test_eq(len(tok), df_tok['text_lengths'][idx])
#test
path = untar_data(URLs.IMDB_SAMPLE)
tok_csv(path/'texts.csv', 'text', chunksize=500)
assert (path/'texts_tok.csv').exists()
df = pd.read_csv(path/'texts.csv')
df_tok = pd.read_csv(path/'texts_tok.csv')
test_eq(len(df_tok), len(df))
idx = random.randint(0, len(df)-1)
text = df['text'][idx]
tok = tokenize1(text)
test_eq(SEP.join(tok), df_tok['text'][idx])
test_eq(len(tok), df_tok['text_lengths'][idx])
import collections
class ReadTokens(Transform):
def __call__(self, o):
text = read_text(o) if isinstance(o, Path) else str(o)
return text.split(SEP)
def decode(self, o): return SEP.join(o)
def show(self, x, ax): print(x)
class Numericalize(MultiCategorize):
_order = 5
def __init__(self, vocab):
self.vocab = vocab
self.o2i = collections.defaultdict(int, {w:i for i,w in enumerate(vocab)})
class Text(Item):
tfm = [ReadTokens, Numericalize]
def text_getter(suf='', **kwargs):
def _inner(o, **kwargs):
return get_files(o/suf, extensions=['.txt'], recurse=True)
return _inner
class ImdbData(DataBlock):
types = (Text,Item)
get_items = text_getter()
split = random_splitter()
label_func = lambda fn,self: int(read_text(fn.parent/f'{fn.stem}.len'))
path = untar_data(URLs.IMDB)
path_tok = path.parent/'imdb_tok'
counter = pickle.load(open(path_tok/'counter.pkl', 'rb'))
vocab = [w for w,i in counter.most_common(60000) if i >= 2]
dblk = ImdbData(path_tok, tfms_x=[ReadTokens(), Numericalize(vocab)])
dsrc = dblk.datasource()
x,y = dsrc.get(0,0)
t = dsrc.decode((x,y))
t
class LM_PreLoader():
def __init__(self, fl, lengths=None, bs=64, bptt=70, shuffle=False):
self.fl,self.bs,self.bptt,self.shuffle = fl,bs,bptt,shuffle
self.lengths = [len(o[0]) for o in fl] if lengths is None else lengths
self.n_batch = sum(self.lengths) // bs
self.batchify()
def __len__(self): return ((self.n_batch-1) // self.bptt) * self.bs
def __getitem__(self, i):
k = (i % self.bs) * self.n_batch + (i // self.bs) * self.bptt
item_idx = (self.cumlen > k).nonzero().min().item()
offset = k if item_idx==0 else k-self.cumlen[item_idx-1]
text = self.fl[item_idx][0][offset:]
while len(text) <= self.bptt:
item_idx += 1
text += self.fl[item_idx][0]
return tensor(text[:self.bptt]),tensor(text[1:self.bptt+1])
def batchify(self):
self.idxs = torch.randperm(len(fl)) if self.shuffle else tensor(range(len(self.fl)))
self.cumlen = (tensor(self.lengths)[idxs] if self.shuffle else tensor(self.lengths)).cumsum(0)
#test
ds = LM_PreLoader(dsrc[0], lengths=lengths)
x,y = ds[0]
test_equal(x[1:], y[:-1])
x0,x1 = dsrc.get(0,0)[0],dsrc.get(1,0)[0]
test_equal(x, tensor(x0+x1)[:70])
test_equal(ds[64][0], tensor(x0+x1)[70:140])
k = ds.n_batch
x,y = ds[1]
offset = k - ds.cumlen[1262]
test_equal(x, tensor(dsrc.get(1263,0)[0][offset:offset+70]))
data = DataLoader(ds, 64, shuffle=False, num_workers=4)
%time for (x,y) in progress_bar(data): pass