Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How would I go about using concurrent.futures and queues for a real-time scenario?

Tags:

It is fairly easy to do parallel work with Python 3's concurrent.futures module as shown below.

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:     future_to = {executor.submit(do_work, input, 60): input for input in dictionary}     for future in concurrent.futures.as_completed(future_to):         data = future.result()   

It is also very handy to insert and retrieve items into a Queue.

q = queue.Queue() for task in tasks: q.put(task) while not q.empty():    q.get() 

I have a script running in background listening for updates. Now, in theory assume that, as those updates arrive, I would queue them and do work on them concurrently using the ThreadPoolExecutor.

Now, individually, all of these components work in isolation, and make sense, but how do I go about using them together? I am not aware if it is possible to feed the ThreadPoolExecutor work from the queue in real time unless the data to work from is predetermined?

In a nutshell, all I want to do is, receive updates of say 4 messages a second, shove them in a queue, and get my concurrent.futures to work on them. If I don't, then I am stuck with a sequential approach which is slow.

Let's take the canonical example in the Python documentation below:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:     future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}     for future in concurrent.futures.as_completed(future_to_url):         url = future_to_url[future]         try:             data = future.result()         except Exception as exc:             print('%r generated an exception: %s' % (url, exc))         else:             print('%r page is %d bytes' % (url, len(data))) 

The list of URLS is fixed. Is it possible to feed this list in real-time and get the worker to process it as they come by, perhaps from a queue for management purposes? I am a bit confused on whether my approach is actually possible?

like image 358
Ali Gajani Avatar asked Jan 14 '17 08:01

Ali Gajani


1 Answers

The example from the Python docs, expanded to take its work from a queue. A change to note, is that this code uses concurrent.futures.wait instead of concurrent.futures.as_completed to allow new work to be started while waiting for other work to complete.

import concurrent.futures import urllib.request import time import queue  q = queue.Queue()  URLS = ['http://www.foxnews.com/',         'http://www.cnn.com/',         'http://europe.wsj.com/',         'http://www.bbc.co.uk/',         'http://some-made-up-domain.com/']  def feed_the_workers(spacing):     """ Simulate outside actors sending in work to do, request each url twice """     for url in URLS + URLS:         time.sleep(spacing)         q.put(url)     return "DONE FEEDING"  def load_url(url, timeout):     """ Retrieve a single page and report the URL and contents """     with urllib.request.urlopen(url, timeout=timeout) as conn:         return conn.read()  # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:      # start a future for a thread which sends work in through the queue     future_to_url = {         executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}      while future_to_url:         # check for status of the futures which are currently working         done, not_done = concurrent.futures.wait(             future_to_url, timeout=0.25,             return_when=concurrent.futures.FIRST_COMPLETED)          # if there is incoming work, start a new future         while not q.empty():              # fetch a url from the queue             url = q.get()              # Start the load operation and mark the future with its URL             future_to_url[executor.submit(load_url, url, 60)] = url          # process any completed futures         for future in done:             url = future_to_url[future]             try:                 data = future.result()             except Exception as exc:                 print('%r generated an exception: %s' % (url, exc))             else:                 if url == 'FEEDER DONE':                     print(data)                 else:                     print('%r page is %d bytes' % (url, len(data)))              # remove the now completed future             del future_to_url[future] 

Output from fetching each url twice:

'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes 'http://www.bbc.co.uk/' page is 193780 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes DONE FEEDING 'http://www.bbc.co.uk/' page is 193605 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://europe.wsj.com/' page is 874649 bytes 'http://europe.wsj.com/' page is 874649 bytes 
like image 178
Stephen Rauch Avatar answered Sep 26 '22 16:09

Stephen Rauch