I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).
In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.
Here's an example:
def process_file(file_string):
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.
Pool.map
and Pool.map_async
list
ify the iterable
passed to them, so your generator will always be realized fully before processing even begins.
The various Pool.imap*
functions appear to process inputs as generators, so you might be able to change:
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
to:
# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))
and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.
In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:
def process_file(path):
with open(path, 'rb') as f:
file_string = f.read()
... same as before ...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
You are reading the files into the parent's memory and then transferring the payload into the children. That's rather inefficient. Send just the filename and let the children do the I/O. If the result is a bunch of text that you plan to write to a file, do that in the child also.
map
will normally issue large blocks of work in one shot to reduce communication overhead with its pool workers. That's probably why you get the big memory spike. Passing just the filename solves that problem but setting a small chunksize is still beneficial when you have uneven processing time among the workers.
def process_file(filename):
with open(filename, 'rb') as fp:
file_string = fp.read()
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With