from fastai.text import *
path = untar_data(URLs.IMDB)
il = (TextList.from_folder(path, processor=[OpenFileProcessor(), TokenizeProcessor()])
.filter_by_folder(include=['train', 'test', 'unsup']))
opener = OpenFileProcessor()
opener.process(il)
tokenizer = TokenizeProcessor()
%time tokenizer.process(il)
from exp.nb_12 import *
path = datasets.untar_data(datasets.URLs.IMDB)
il = TextList.from_files(path, include=['train', 'test', 'unsup'])
tp = TokenizeProcessor()
texts = [read_file(f) for f in il.items]
%time tokens = tp(texts)
Doesn't kill process each time.
from exp.nb_12 import *
path = datasets.untar_data(datasets.URLs.IMDB)
il = TextList.from_files(path, include=['train', 'test', 'unsup'])
from multiprocessing import Process, Queue, cpu_count
def text_gen(fnames):
for fn in fnames:
with open(fn, 'r') as r:
txt = r.read()
for fn in default_pre_rules:
txt = fn(txt)
yield txt
def process_files(fnames, data_queue, progress_queue, lang='en', batch_size=5000):
nlp = spacy.blank(lang, disable=["parser", "tagger", "ner"])
for w in default_spec_tok: nlp.tokenizer.add_special_case(w, [{ORTH: w}])
tokens = []
for docs in nlp.pipe(text_gen(fnames), batch_size=batch_size):
toks = [t.text for t in docs]
for fn in default_post_rules: toks = fn(toks)
tokens.append(toks)
progress_queue.put(1)
data_queue.put(tokens)
def tokenize(fnames, lang='en', n_workers=4, chunk_size=5000):
progress_queue,data_queue = Queue(maxsize=n_workers),Queue(maxsize=n_workers)
processes = [Process(target=process_files,
args=(batch, data_queue, progress_queue, lang, chunk_size))
for i,batch in enumerate(np.array_split(fnames, n_workers))]
for p in processes: p.start()
tokens = []
for _ in progress_bar(fnames): _ = progress_queue.get()
for _ in processes: tokens += data_queue.get()
for p in processes: p.join()
return tokens
%time t = tokenize(il.items)