I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.
def jl_file_iterator(file): with codecs.open(file, 'r', 'utf-8') as f: for line in f: document = json.loads(line) yield document
My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing
threads = 4 files, i = [], 1 for jl in jl_file_iterator(input_path): files.append(jl) if i % (threads) == 0: # pool.map(processFile, files) parallelProcess(files, o) files = [] i += 1 if files: parallelProcess(files, o) files = []
This is my code where actual processing happens
def parallelProcess(files, outfile): processes = [] for i in range(len(files)): p = Process(target=processFile, args=(files[i],)) processes.append(p) p.start() for i in range(len(files)): processes[i].join() def processFile(doc): extractors = {} ... do some processing on doc o.write(json.dumps(doc) + '\n')
As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?
PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.
Thanks for your help
As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.
Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.
In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line)
part in parallel.
import multiprocessing as mp NCORE = 4 def process(q, iolock): from time import sleep while True: stuff = q.get() if stuff is None: break with iolock: print("processing", stuff) sleep(stuff) if __name__ == '__main__': q = mp.Queue(maxsize=NCORE) iolock = mp.Lock() pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock)) for stuff in range(20): q.put(stuff) # blocks until q below its max size with iolock: print("queued", stuff) for _ in range(NCORE): # tell workers we're done q.put(None) pool.close() pool.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With