Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Limiting simultaneously running asyncio coroutines with semaphores in a background thread

As an experiment with Python's new asyncio module, I created the following snippet to process a set of long running actions (jobs) in a background worker.

In an attempt to control the number of simultaneously running jobs, I introduced a semaphore in a with block (line 56). However, with the semaphore in place, it seems that the acquired locks are never released because after completion (the callbacks are executed) the waiting jobs don't start. When I ditch the with block, everything works as expected.

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None    # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()

Can anyone help me shed some light on this behavior? Why are isn't the semaphore incremented when the with block is cleared?

like image 624
ImapUkua Avatar asked Dec 03 '25 15:12

ImapUkua


1 Answers

I discovered the bug: the semaphore is initialized with an implicit eventloop from the main thread, not the one explicitly set when the thread is started with run().

Fixed version:

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None                           # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = None    # Semaphore must be initialized after the loop is set.

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()
like image 149
ImapUkua Avatar answered Dec 05 '25 10:12

ImapUkua



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!