I'm consuming messages from a RabbitMQ channel, I wish I could consume n elements at a time. I think I could use a ProcessPoolExecutor (or ThreadPoolExecutor). I just wonder if it's possible to know if there's a free executor in the pool.
This is what I want to write:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
    print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
    # this method is called once per incoming message
    future = executor.submit(consume, message)
    block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
    running.append(future) # this grows forever!
    futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
I need to write the function block_until_a_free_worker. This methods should be able to check if all the running workers are in use or not.
In alternative I could use any blocking executor.submit option, if available.
I tried a different approach and change the list of futures meanwhile they are completed. I tried to explicitly add and remove futures from a list and then waiting like this:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
It seems it's not a solution.
I could set a future.add_done_callback, and possibily count the running instances...
Any hint or ideas? Thank you.
I gave a similar answer here.
Semaphores serve the purpose of limiting the access to a resource to a set of workers.
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor 
class TaskManager:
    def __init__(self, workers):
        self.pool = ProcessPoolExecutor(max_workers=workers)
        self.workers = Semaphore(workers)
    def new_task(self, function):
        """Start a new task, blocks if all workers are busy."""
        self.workers.acquire()  # flag a worker as busy
        future = self.pool.submit(function, ... )
        future.add_task_done(self.task_done)
    def task_done(self, future):
        """Called once task is done, releases one worker."""
        self.workers.release()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With