Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Concurrent Futures without running out of RAM

I'm doing some file parsing that is a CPU bound task. No matter how many files I throw at the process it uses no more than about 50MB of RAM. The task is parrallelisable, and I've set it up to use concurrent futures below to parse each file as a separate process:

    from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = {}

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)

The problem is that when I run this using futures, RAM usage rockets and before long I've run out and Python has crashed. This is probably in large part because the results from parse_function are several MB in size. Once the results have been through post_processing, the application has no further need of them. As you can see, I'm trying del jobs[job] to clear items out of jobs, but this has made no difference, memory usage remains unchanged, and seems to increase at the same rate.

I've also confirmed it's not because it's waiting on the post_process function by only using a single process, plus throwing in a time.sleep(1).

There's nothing in the futures docs about memory management, and while a brief search indicates it has come up before in real-world applications of futures (Clear memory in python loop and http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - the answers don't translate to my use-case (they're all concerned with timeouts and the like).

So, how do you use Concurrent futures without running out of RAM? (Python 3.5)

like image 856
GIS-Jonathan Avatar asked Jan 13 '16 15:01

GIS-Jonathan


1 Answers

I'll take a shot (Might be a wrong guess...)

You might need to submit your work bit by bit since on each submit you're making a copy of parser_variables which may end up chewing your RAM.

Here is working code with "<----" on the interesting parts

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<----
    files_iter = iter(files_list) #<------

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   <---

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <-----
like image 74
Yoav Glazner Avatar answered Oct 21 '22 18:10

Yoav Glazner