In python, what's the idiomatic way to establish a one-way communication between two threading.Threads, call them thread a and thread b.
a is the producer, it continuously generates values for b to consume.
b is the consumer, it reads one value generated by a, process the value with a coroutine, and then reads the next value, and so on.
Illustration:
q = very_magic_queue.Queue()
def worker_of_a(q):
while True:
q.put(1)
time.sleep(1)
a = threading.Thread(worker_of_a, args=(q,))
a.start()
async def loop(q):
while True:
# v must be processed in the same order as they are produced
v = await q.get()
print(v)
async def foo():
pass
async def b_main(q):
loop_fut = asyncio.ensure_future(loop(q))
foo_fut = asyncio.ensure_future(foo())
_ = await asyncio.wait([loop_fut, foo_fut], ...)
# blah blah blah
def worker_of_b(q):
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(b_main(q))
b = threading.Thread(worker_of_b, args=(q,))
b.start()
Of course the above code doesn't work, because queue.Queue.get cannot be awaitted, and asyncio.Queue cannot be used in another thread.
I also need a communication channel from b to a.
I would be great if the solution could also work with gevent.
Thanks :)
I've used Janus to solve this problem - it's a Python library that gives you a thread-safe queue that can be used to communicate between asyncio and a thread.
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_code(async_q):
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_code(queue.async_q)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With