I'm doing some file parsing that is a CPU bound task. No matter how many files I throw at the process it uses no more than about 50MB of RAM. The task is parrallelisable, and I've set it up to use concurrent futures below to parse each file as a separate process:
from concurrent import futures
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
for this_file in files_list:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
The problem is that when I run this using futures, RAM usage rockets and before long I've run out and Python has crashed. This is probably in large part because the results from parse_function are several MB in size. Once the results have been through post_processing
, the application has no further need of them. As you can see, I'm trying del jobs[job]
to clear items out of jobs
, but this has made no difference, memory usage remains unchanged, and seems to increase at the same rate.
I've also confirmed it's not because it's waiting on the post_process
function by only using a single process, plus throwing in a time.sleep(1)
.
There's nothing in the futures docs about memory management, and while a brief search indicates it has come up before in real-world applications of futures (Clear memory in python loop and http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - the answers don't translate to my use-case (they're all concerned with timeouts and the like).
So, how do you use Concurrent futures without running out of RAM? (Python 3.5)
I'll take a shot (Might be a wrong guess...)
You might need to submit your work bit by bit since on each submit you're making a copy of parser_variables which may end up chewing your RAM.
Here is working code with "<----" on the interesting parts
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
files_left = len(files_list) #<----
files_iter = iter(files_list) #<------
while files_left:
for this_file in files_iter:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
if len(jobs) > MAX_JOBS_IN_QUEUE:
break #limit the job submission for now job
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
files_left -= 1 #one down - many to go... <---
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
break; #give a chance to add more jobs <-----
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With