I want to enable parallel processing/threading of my program using the concurrent.futures module.
Unfortunately I can't seem to find any nice, simple, idiot-proof examples of using the concurrent.futures module. They typically require more advanced knowledge of python or processing/threading concepts and jargon.
The below is a simplified, self-contained example based on my program: there's a purely CPU bound task ideal for multiprcessing, and a separate IO bound task inserting into a database (SQLite). In my program I've already converted this to use the multiprocessing pool class, but because the results from the CPU bound task are all collected up waiting for the tasks to finish, it uses massive amounts of memory. Thus I'm looking to use a combination of threading/processing which I believe concurrent.futures can do for me fairly simply.
So how do I convert the below into something that uses this module?
import sqlite3
#Stand in CPU intensive task
def calculate(value):
return value * 10
#Stand in Thread I/O intensive task
def output(value):
global db
if (value % 1000) == 0:
db.execute('delete from test_table')
db.execute('insert into test_table (result) values (?)', (value,))
def main():
global db
results = []
db = sqlite3.connect('e:\\z_dev\\test.sqlite')
db.cursor()
#=========
#Perform CPU intensive task
for i in range(1000):
results.append( calculate(i))
#Perform Threading intensive task
for a in results:
output(a)
#=========
db.commit()
db.close()
if __name__ == '__main__':
main()
I'm looking for an answer that doesn't use any fancy/complex python. Or a nice clear simple explanation, or ideally both!
Thanks
Edit: My current "multiprocessor" implementation. Probably wrong, but it seems to work. No threading whatsoever. This goes inside the "#=========" part of the above.
#Multiprocessing
pool = multiprocessing.Pool(None)
for i in range(1000):
results.append( pool.apply_async(calculate(i)))
pool.close()
pool.join()
for i in results:
results[i] = results[i].get()
#Complete lack of threading; but if I had it, it'd be here:
for a in results:
output(a)
concurrent.futures
has a minimalistic API. It's easy to use for very straightforward problems, but you don't have a very straightforward problem. If you did, you would already have solved it ;-)
You didn't show any of the multiprocessing.Pool
code you wrote, but that would be a more promising place to start - assuming you want to solve the problem more than you want to confirm your hope that it must be easy to do if you only you switched to a weaker API ;-)
"An obvious" way to proceed using multiprocessing
is to use the Pool.apply_async()
method, put the async result objects on a bounded Queue.Queue
, and have threads in your main program pull those off the Queue
and wait for the results to show up. This is easy enough, but it's not magic. It solves your problem because bounded Queues
are the canonical way to mediate between producers and consumers that run at different speeds. Nothing in concurrent.futures
addresses that problem directly, and it's at the heart of your "massive amounts of memory" problem.
# Define global result_queue only in the main program.
import Queue
result_queue = Queue.Queue(100) # pick a reasonable max size based on your problem
# Run this in as many threads as you like.
def consume_results():
while True:
a = result_queue.get()
if a is None:
break
output(a.get()) # `output()` is your function
...
# main program passes out work, after starting threads
for i in range(1000):
# the .put() will block so long as the queue is at its max size
result_queue.put(pool.apply_async(calculate, args=(i,)))
# add sentinels to let threads know they're done
for i in range(number_of_threads_you_started):
result_queue.put(None)
That's the kind of thing you need to keep producers and consumers roughly in balance, and there's nothing in any standard library that will do it for you by magic.
EDIT - fleshing it out
Here's a complete, executable example anyone with Python3 can run. Notes:
concurrent.futures
to manage both processes and threads. It's not really harder to use multiprocessing
and threading
instead, and indeed the way threads are used here it would be a little easier using threading
directly. But this way is clear enough.concurrent.futures
Future
object is basically the same thing as a multiprocessing
async result object - the API functionalities are just spelled differently.MAX_QUEUE_SIZE
.i += 1
;-)Here's the code:
import concurrent.futures as cf
import threading
import queue
NUM_CPUS = 3
NUM_THREADS = 4
MAX_QUEUE_SIZE = 20
# Runs in worker processes.
def producer(i):
return i + 10
def consumer(i):
global total
# We need to protect this with a lock because
# multiple threads in the main program can
# execute this function simultaneously.
with sumlock:
total += i
# Runs in threads in main program.
def consume_results(q):
while True:
future = q.get()
if future is None:
break
else:
consumer(future.result())
if __name__ == "__main__":
sumlock = threading.Lock()
result_queue = queue.Queue(MAX_QUEUE_SIZE)
total = 0
NUM_TO_DO = 1000
with cf.ThreadPoolExecutor(NUM_THREADS) as tp:
# start the threads running `consume_results`
for _ in range(NUM_THREADS):
tp.submit(consume_results, result_queue)
# start the worker processes
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for i in range(NUM_TO_DO):
# blocks until the queue size <= MAX_QUEUE_SIZE
result_queue.put(pp.submit(producer, i))
# tell threads we're done
for _ in range(NUM_THREADS):
result_queue.put(None)
print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)
If all is well, this is the expected output:
got 509500 expected 509500
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With