Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing on a generator that reads files in

I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).

In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.

Here's an example:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.

like image 810
mgoldwasser Avatar asked Dec 07 '15 21:12

mgoldwasser


2 Answers

Pool.map and Pool.map_async listify the iterable passed to them, so your generator will always be realized fully before processing even begins.

The various Pool.imap* functions appear to process inputs as generators, so you might be able to change:

results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

to:

# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))

and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.

In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:

def process_file(path):
     with open(path, 'rb') as f:
         file_string = f.read()
     ... same as before ...
     return processed_file

pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
like image 51
ShadowRanger Avatar answered Nov 15 '22 19:11

ShadowRanger


You are reading the files into the parent's memory and then transferring the payload into the children. That's rather inefficient. Send just the filename and let the children do the I/O. If the result is a bunch of text that you plan to write to a file, do that in the child also.

map will normally issue large blocks of work in one shot to reduce communication overhead with its pool workers. That's probably why you get the big memory spike. Passing just the filename solves that problem but setting a small chunksize is still beneficial when you have uneven processing time among the workers.

def process_file(filename):
     with open(filename, 'rb') as fp:
         file_string = fp.read()
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)
like image 45
tdelaney Avatar answered Nov 15 '22 17:11

tdelaney