Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent.futures usage guide - a simple example of using both threading and processing

I want to enable parallel processing/threading of my program using the concurrent.futures module.

Unfortunately I can't seem to find any nice, simple, idiot-proof examples of using the concurrent.futures module. They typically require more advanced knowledge of python or processing/threading concepts and jargon.

The below is a simplified, self-contained example based on my program: there's a purely CPU bound task ideal for multiprcessing, and a separate IO bound task inserting into a database (SQLite). In my program I've already converted this to use the multiprocessing pool class, but because the results from the CPU bound task are all collected up waiting for the tasks to finish, it uses massive amounts of memory. Thus I'm looking to use a combination of threading/processing which I believe concurrent.futures can do for me fairly simply.

So how do I convert the below into something that uses this module?

import sqlite3

#Stand in CPU intensive task
def calculate(value):
    return value * 10

#Stand in Thread I/O intensive task
def output(value):
    global db

    if (value % 1000) == 0:
        db.execute('delete from test_table')

    db.execute('insert into test_table (result) values (?)', (value,))

def main():
    global db
    results = []

    db  = sqlite3.connect('e:\\z_dev\\test.sqlite')
    db.cursor()

    #=========
    #Perform CPU intensive task
    for i in range(1000):
        results.append( calculate(i))

    #Perform Threading intensive task
    for a in results:
        output(a)
    #=========

    db.commit()
    db.close()

if __name__ == '__main__':
    main()

I'm looking for an answer that doesn't use any fancy/complex python. Or a nice clear simple explanation, or ideally both!

Thanks

Edit: My current "multiprocessor" implementation. Probably wrong, but it seems to work. No threading whatsoever. This goes inside the "#=========" part of the above.

#Multiprocessing
pool = multiprocessing.Pool(None)
for i in range(1000):
    results.append( pool.apply_async(calculate(i)))
pool.close()
pool.join()

for i in results:
    results[i] = results[i].get()

#Complete lack of threading; but if I had it, it'd be here:     
for a in results:
    output(a)
like image 568
GIS-Jonathan Avatar asked Dec 26 '13 10:12

GIS-Jonathan


1 Answers

concurrent.futures has a minimalistic API. It's easy to use for very straightforward problems, but you don't have a very straightforward problem. If you did, you would already have solved it ;-)

You didn't show any of the multiprocessing.Pool code you wrote, but that would be a more promising place to start - assuming you want to solve the problem more than you want to confirm your hope that it must be easy to do if you only you switched to a weaker API ;-)

"An obvious" way to proceed using multiprocessing is to use the Pool.apply_async() method, put the async result objects on a bounded Queue.Queue, and have threads in your main program pull those off the Queue and wait for the results to show up. This is easy enough, but it's not magic. It solves your problem because bounded Queues are the canonical way to mediate between producers and consumers that run at different speeds. Nothing in concurrent.futures addresses that problem directly, and it's at the heart of your "massive amounts of memory" problem.

# Define global result_queue only in the main program.
import Queue
result_queue = Queue.Queue(100)  # pick a reasonable max size based on your problem

# Run this in as many threads as you like.
def consume_results():
    while True:
        a = result_queue.get()
        if a is None:
            break
        output(a.get())  # `output()` is your function

...
# main program passes out work, after starting threads
for i in range(1000):
    # the .put() will block so long as the queue is at its max size
    result_queue.put(pool.apply_async(calculate, args=(i,)))
# add sentinels to let threads know they're done
for i in range(number_of_threads_you_started):
    result_queue.put(None)

That's the kind of thing you need to keep producers and consumers roughly in balance, and there's nothing in any standard library that will do it for you by magic.

EDIT - fleshing it out

Here's a complete, executable example anyone with Python3 can run. Notes:

  • It doesn't use your code fragments, because those rely on an external database module not everyone can run.
  • It sticks to concurrent.futures to manage both processes and threads. It's not really harder to use multiprocessing and threading instead, and indeed the way threads are used here it would be a little easier using threading directly. But this way is clear enough.
  • A concurrent.futures Future object is basically the same thing as a multiprocessing async result object - the API functionalities are just spelled differently.
  • Your problem is not straightforward, because it has multiple stages that can run at different speeds. Again, nothing in any standard library can hide the potentially bad consequences of that by magic. Creating your own bounded queue remains the best solution to that. Memory use here will remain modest for any sane value of MAX_QUEUE_SIZE.
  • You generally don't want to create more CPU-bound worker processes than one less than the number of cores you have available to use. The main program also needs cycles to run, and so does the OS.
  • Once you're used to this stuff, all the comments in this code would be annoying, like seeing the comment "increment by 1" on the code line i += 1 ;-)

Here's the code:

import concurrent.futures as cf
import threading
import queue

NUM_CPUS = 3
NUM_THREADS = 4
MAX_QUEUE_SIZE = 20

# Runs in worker processes.
def producer(i):
    return i + 10

def consumer(i):
    global total
    # We need to protect this with a lock because
    # multiple threads in the main program can
    # execute this function simultaneously.
    with sumlock:
        total += i

# Runs in threads in main program.
def consume_results(q):
    while True:
        future = q.get()
        if future is None:
            break
        else:
            consumer(future.result())

if __name__ == "__main__":
    sumlock = threading.Lock()
    result_queue = queue.Queue(MAX_QUEUE_SIZE)
    total = 0
    NUM_TO_DO = 1000
    with cf.ThreadPoolExecutor(NUM_THREADS) as tp:
        # start the threads running `consume_results`
        for _ in range(NUM_THREADS):
            tp.submit(consume_results, result_queue)
        # start the worker processes
        with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
            for i in range(NUM_TO_DO):
                # blocks until the queue size <= MAX_QUEUE_SIZE
                result_queue.put(pp.submit(producer, i))
        # tell threads we're done
        for _ in range(NUM_THREADS):
            result_queue.put(None)
    print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)

If all is well, this is the expected output:

got 509500 expected 509500
like image 170
Tim Peters Avatar answered Sep 28 '22 11:09

Tim Peters