Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing with generator

I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.

def jl_file_iterator(file):     with codecs.open(file, 'r', 'utf-8') as f:         for line in f:             document = json.loads(line)             yield document 

My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing

threads = 4 files, i = [], 1 for jl in jl_file_iterator(input_path):     files.append(jl)     if i % (threads) == 0:         # pool.map(processFile, files)         parallelProcess(files, o)         files = []     i += 1  if files:     parallelProcess(files, o)     files = [] 

This is my code where actual processing happens

def parallelProcess(files, outfile):     processes = []     for i in range(len(files)):         p = Process(target=processFile, args=(files[i],))         processes.append(p)         p.start()     for i in range(len(files)):         processes[i].join()  def processFile(doc):     extractors = {}     ... do some processing on doc     o.write(json.dumps(doc) + '\n') 

As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?

PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.

Thanks for your help

like image 662
Muthu Rg Avatar asked Mar 28 '17 20:03

Muthu Rg


1 Answers

As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.

Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.

In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line) part in parallel.

import multiprocessing as mp  NCORE = 4  def process(q, iolock):     from time import sleep     while True:         stuff = q.get()         if stuff is None:             break         with iolock:             print("processing", stuff)         sleep(stuff)  if __name__ == '__main__':     q = mp.Queue(maxsize=NCORE)     iolock = mp.Lock()     pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))     for stuff in range(20):         q.put(stuff)  # blocks until q below its max size         with iolock:             print("queued", stuff)     for _ in range(NCORE):  # tell workers we're done         q.put(None)     pool.close()     pool.join() 
like image 96
Tim Peters Avatar answered Sep 21 '22 07:09

Tim Peters