Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I synchronize asyncio with other OS threads?

I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

However, this does not work with multiple threads. If I just use a threading.Event then it will block the asyncio thread. I figured out I could defer the wait to an executor:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?

like image 468
jdehesa Avatar asked Dec 10 '25 15:12

jdehesa


1 Answers

A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.

like image 72
user4815162342 Avatar answered Dec 13 '25 06:12

user4815162342