from nb_200 import * path = untar_data(URLs.IMDB) # export from multiprocessing import Process, Queue import spacy,html from spacy.symbols import ORTH from fastprogress import progress_bar,master_bar import pickle,random #export #special tokens UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ = "xxunk xxpad xxbos xxeos xxfld xxrep xxwrep xxup xxmaj".split() def sub_br(t): "Replaces the
by \n" re_br = re.compile(r'<\s*br\s*/?>', re.IGNORECASE) return re_br.sub("\n", t) def spec_add_spaces(t): "Add spaces around / and #" return re.sub(r'([/#])', r' \1 ', t) def rm_useless_spaces(t): "Remove multiple spaces" return re.sub(' {2,}', ' ', t) def replace_rep(t): "Replace repetitions at the character level: cccc -> TK_REP 4 c" def _replace_rep(m:Collection[str]) -> str: c,cc = m.groups() return f' {TK_REP} {len(cc)+1} {c} ' re_rep = re.compile(r'(\S)(\1{3,})') return re_rep.sub(_replace_rep, t) def replace_wrep(t): "Replace word repetitions: word word word -> TK_WREP 3 word" def _replace_wrep(m:Collection[str]) -> str: c,cc = m.groups() return f' {TK_WREP} {len(cc.split())+1} {c} ' re_wrep = re.compile(r'(\b\w+\W+)(\1{3,})') return re_wrep.sub(_replace_wrep, t) def fixup_text(x): "Various messy things we've seen in documents" re1 = re.compile(r' +') x = x.replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace( 'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace( '
', "\n").replace('\\"', '"').replace('',UNK).replace(' @.@ ','.').replace( ' @-@ ','-').replace('\\', ' \\ ') return re1.sub(' ', html.unescape(x)) default_pre_rules = [fixup_text, replace_rep, replace_wrep, spec_add_spaces, rm_useless_spaces, sub_br] default_spec_tok = [UNK, PAD, BOS, EOS, FLD, TK_REP, TK_WREP, TK_UP, TK_MAJ] replace_rep('cccc') replace_wrep('word word word word word ') #export def replace_all_caps(x): "Replace tokens in ALL CAPS by their lower version and add `TK_UP` before." res = [] for t in x: if t.isupper() and len(t) > 1: res.append(TK_UP); res.append(t.lower()) else: res.append(t) return res def deal_caps(x): "Replace all Capitalized tokens in by their lower version and add `TK_MAJ` before." res = [] for t in x: if t == '': continue if t[0].isupper() and len(t) > 1 and t[1:].islower(): res.append(TK_MAJ) res.append(t.lower()) return res def add_eos_bos(x): return [BOS] + x + [EOS] default_post_rules = [deal_caps, replace_all_caps, add_eos_bos] replace_all_caps(['I', 'AM', 'SHOUTING']) deal_caps(['My', 'name', 'is', 'Jeremy']) class BaseTokenizer(): def __init__(self, lang, special_toks): pass def pipe(self, items): for t in items: yield t.split(' ') class SpacyTokenizer(): def __init__(self, lang='en', special_toks=None, batch_size=5000): special_toks = ifnone(special_toks, default_spec_tok) self.nlp = spacy.blank(lang, disable=["parser", "tagger", "ner"]) for w in default_spec_tok: self.nlp.tokenizer.add_special_case(w, [{ORTH: w}]) self.batch_size=batch_size def pipe(self, items): for doc in self.nlp.pipe(items, batch_size=self.batch_size): yield [d.text for d in doc] def apply_rules(items, rules): for o in items: yield apply_all(o, rules) def tokenize1(text, tok_func=SpacyTokenizer, pre_rules=None, post_rules=None, **tok_kwargs): pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy())) post_rules = listify(ifnone(post_rules, default_post_rules.copy())) tokenizer = tok_func(**tok_kwargs) for tok in tokenizer.pipe(apply_rules([text], pre_rules)): tok = apply_all(tok, post_rules) return tok def read_text(fname): with open(fname, 'r') as f: return f.read() def tok_items(items, tok_func, pre_rules, post_rules, output_func, output_queue, data_queue=None, **tok_kwargs): tokenizer = tok_func(**tok_kwargs) if data_queue: counts = Counter() for i,tok in enumerate(tokenizer.pipe(apply_rules(items, pre_rules))): tok = apply_all(tok, post_rules) output_queue.put(output_func(items[i], tok)) if data_queue: counts.update(Counter(tok)) if data_queue: data_queue.put(counts) def create_folders(path, output_dir, include=None): output_dir = Path(output_dir) os.makedirs(output_dir, exist_ok=True) for i,(p,d,f) in enumerate(os.walk(path)): # returns (dirpath, dirnames, filenames) if include is not None and i==0: d[:] = [o for o in d if o in include] else: d[:] = [o for o in d if not o.startswith('.')] for x in d: os.makedirs(output_dir/(Path(p)/Path(x)).relative_to(path), exist_ok=True) SEP = '▁' fname = path/'labels.csv' fname.suffix def tok_folder(path, extensions=['.txt'], include=None, output_dir=None, n_workers=4, pre_rules=None, post_rules=None, tok_func=SpacyTokenizer, **tok_kwargs): path = Path(path) fnames = get_files(path, extensions=extensions, recurse=True, include=include) output_dir = Path(ifnone(output_dir, path.parent/f'{path.name}_tok')) create_folders(path, output_dir, include=include) pre_rules = [read_text] + listify(ifnone(pre_rules, default_pre_rules.copy())) post_rules = listify(ifnone(post_rules, default_post_rules.copy())) output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers) def _output(o, tok): out = output_dir/o.relative_to(path) with open(out, 'w') as f: f.write(SEP.join(tok)) with open(out.parent/f'{out.stem}.len', 'w') as f: f.write(str(len(tok))) return 1 processes = [Process(target=tok_items, args=(batch, tok_func, pre_rules, post_rules, _output, output_queue), kwargs={'data_queue': data_queue, **tok_kwargs}) for i,batch in enumerate(np.array_split(fnames, n_workers))] for p in processes: p.start() counter = Counter() for _ in progress_bar(fnames, leave=False): _ = output_queue.get() for _ in processes: counter.update(data_queue.get()) for p in processes: p.join() pickle.dump(counter, open(output_dir/'counter.pkl','wb')) path = untar_data(URLs.IMDB) # test fnames = get_files(path, extensions=['.txt'], recurse=True, include=['train', 'test', 'unsup']) tok_path = path.parent/'imdb_tok' assert tok_path.exists() #Take one file randomly idx = random.randint(0, len(fnames)-1) #Check we have the corresponding tokenized version... tok_fname = tok_path/(fnames[idx].relative_to(path)) assert tok_fname.exists() text = read_text(fnames[idx]) tok = tokenize1(text) assert SEP.join(tok) == read_text(tok_fname) len_fname = tok_fname.parent/f'{tok_fname.stem}.len' assert len(tok) == int(read_text(len_fname)) def join_texts(idx, df, mark_fields=False): return ' '.join([(f'{FLD} {i} ' if mark_fields else '') + t for i,t in enumerate(df.iloc[int(idx)].values)]) def tok_df(df, text_cols, n_workers=4, pre_rules=None, post_rules=None, mark_fields=None, tok_func=SpacyTokenizer, **tok_kwargs): text_cols = listify(text_cols) mark_fields = ifnone(mark_fields, len(listify(text_cols)) > 1) pre_rules = listify(ifnone(pre_rules, default_pre_rules.copy())) pre_rules = [partial(join_texts, df=df[text_cols], mark_fields=mark_fields)] + pre_rules post_rules = listify(ifnone(post_rules, default_post_rules.copy())) output_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers) def _output(o, tok): return (o,tok) processes = [Process(target=tok_items, args=(batch, tok_func, pre_rules, post_rules, _output, output_queue), kwargs={'data_queue': data_queue, **tok_kwargs}) for i,batch in enumerate(np.array_split(range(len(df)), n_workers))] for p in processes: p.start() lengths,outputs,counter = np.zeros(len(df)),np.zeros(len(df), dtype=np.object),Counter() for _ in progress_bar(range(len(df)), leave=False): i,tok = output_queue.get() lengths[i],outputs[i] = len(tok),SEP.join(tok) for _ in processes: counter.update(data_queue.get()) for p in processes: p.join() other_cols = [c for c in df.columns if c not in text_cols] res = df[other_cols].copy() res['text'],res['text_lengths'] = outputs,lengths return res, counter # test path = untar_data(URLs.IMDB_SAMPLE) df = pd.read_csv(path/'texts.csv') out,cnt = tok_df(df, text_cols='text') test_eq(set(out.columns),set(list(df.columns)+['text_lengths'])) idx = random.randint(0, len(df)-1) text = df['text'][idx] tok = tokenize1(text) test_eq(SEP.join(tok), out['text'][idx]) test_eq(len(tok), out['text_lengths'][idx]) #With two fields, mark fields become true by default df['text1'] = df['text'] out,cnt = tok_df(df, text_cols=['text', 'text1']) idx = random.randint(0, len(df)-1) text = f"{FLD} 0 {df['text'][idx]} {FLD} 1 {df['text1'][idx]}" tok = tokenize1(text) test_eq(SEP.join(tok), out['text'][idx]) test_eq(len(tok), out['text_lengths'][idx]) def tok_csv(fname, text_cols, outname=None, n_workers=4, pre_rules=None, post_rules=None, mark_fields=None, tok_func=SpacyTokenizer, header='infer', chunksize=None, **tok_kwargs): df = pd.read_csv(fname, header=header, chunksize=chunksize) outname = Path(ifnone(outname, fname.parent/f'{fname.stem}_tok.csv')) kwargs = dict(n_workers=n_workers, pre_rules=pre_rules, post_rules=post_rules, mark_fields=mark_fields, tok_func=tok_func, **tok_kwargs) if chunksize is None: out,cnt = tok_df(df, text_cols, **kwargs) out.to_csv(outname, header=header, index=False) else: cnt = Counter() for i,dfp in enumerate(df): out,c = tok_df(dfp, text_cols, **kwargs) out.to_csv(outname, header=header if i==0 else None, index=False, mode='w' if i==0 else 'a') cnt.update(c) pickle.dump(cnt, open(outname.parent/'counter.pkl', 'wb')) #test path = untar_data(URLs.IMDB_SAMPLE) tok_csv(path/'texts.csv', 'text') assert (path/'texts_tok.csv').exists() df = pd.read_csv(path/'texts.csv') df_tok = pd.read_csv(path/'texts_tok.csv') idx = random.randint(0, len(df)-1) text = df['text'][idx] tok = tokenize1(text) test_eq(SEP.join(tok), df_tok['text'][idx]) test_eq(len(tok), df_tok['text_lengths'][idx]) #test path = untar_data(URLs.IMDB_SAMPLE) tok_csv(path/'texts.csv', 'text', chunksize=500) assert (path/'texts_tok.csv').exists() df = pd.read_csv(path/'texts.csv') df_tok = pd.read_csv(path/'texts_tok.csv') test_eq(len(df_tok), len(df)) idx = random.randint(0, len(df)-1) text = df['text'][idx] tok = tokenize1(text) test_eq(SEP.join(tok), df_tok['text'][idx]) test_eq(len(tok), df_tok['text_lengths'][idx]) import collections class ReadTokens(Transform): def __call__(self, o): text = read_text(o) if isinstance(o, Path) else str(o) return text.split(SEP) def decode(self, o): return SEP.join(o) def show(self, x, ax): print(x) class Numericalize(MultiCategorize): _order = 5 def __init__(self, vocab): self.vocab = vocab self.o2i = collections.defaultdict(int, {w:i for i,w in enumerate(vocab)}) class Text(Item): tfm = [ReadTokens, Numericalize] def text_getter(suf='', **kwargs): def _inner(o, **kwargs): return get_files(o/suf, extensions=['.txt'], recurse=True) return _inner class ImdbData(DataBlock): types = (Text,Item) get_items = text_getter() split = random_splitter() label_func = lambda fn,self: int(read_text(fn.parent/f'{fn.stem}.len')) path = untar_data(URLs.IMDB) path_tok = path.parent/'imdb_tok' counter = pickle.load(open(path_tok/'counter.pkl', 'rb')) vocab = [w for w,i in counter.most_common(60000) if i >= 2] dblk = ImdbData(path_tok, tfms_x=[ReadTokens(), Numericalize(vocab)]) dsrc = dblk.datasource() x,y = dsrc.get(0,0) t = dsrc.decode((x,y)) t class LM_PreLoader(): def __init__(self, fl, lengths=None, bs=64, bptt=70, shuffle=False): self.fl,self.bs,self.bptt,self.shuffle = fl,bs,bptt,shuffle self.lengths = [len(o[0]) for o in fl] if lengths is None else lengths self.n_batch = sum(self.lengths) // bs self.batchify() def __len__(self): return ((self.n_batch-1) // self.bptt) * self.bs def __getitem__(self, i): k = (i % self.bs) * self.n_batch + (i // self.bs) * self.bptt item_idx = (self.cumlen > k).nonzero().min().item() offset = k if item_idx==0 else k-self.cumlen[item_idx-1] text = self.fl[item_idx][0][offset:] while len(text) <= self.bptt: item_idx += 1 text += self.fl[item_idx][0] return tensor(text[:self.bptt]),tensor(text[1:self.bptt+1]) def batchify(self): self.idxs = torch.randperm(len(fl)) if self.shuffle else tensor(range(len(self.fl))) self.cumlen = (tensor(self.lengths)[idxs] if self.shuffle else tensor(self.lengths)).cumsum(0) #test ds = LM_PreLoader(dsrc[0], lengths=lengths) x,y = ds[0] test_equal(x[1:], y[:-1]) x0,x1 = dsrc.get(0,0)[0],dsrc.get(1,0)[0] test_equal(x, tensor(x0+x1)[:70]) test_equal(ds[64][0], tensor(x0+x1)[70:140]) k = ds.n_batch x,y = ds[1] offset = k - ds.cumlen[1262] test_equal(x, tensor(dsrc.get(1263,0)[0][offset:offset+70])) data = DataLoader(ds, 64, shuffle=False, num_workers=4) %time for (x,y) in progress_bar(data): pass