Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Equivalent of asyncio.Queues with worker "threads"

I'm trying to figure out how to port a threaded program to use asyncio. I have a lot of code which synchronizes around a few standard library Queues, basically like this:

import queue, random, threading, time  q = queue.Queue()  def produce():     while True:         time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds         q.put(random.random())  def consume():     while True:          value = q.get(block=True)         print("Consumed", value)  threading.Thread(target=produce).start() threading.Thread(target=consume).start() 

One thread creates values (possibly user input), and another thread does something with them. The point is that these threads are idle until there's new data, at which point they wake up and do something with it.

I'm trying to implement this pattern using asyncio, but I can't seem to figure out how to make it "go".

My attempts look more or less like this (and don't do anything at all).

import asyncio, random  q = asyncio.Queue()  @asyncio.coroutine def produce():     while True:          q.put(random.random())         yield from asyncio.sleep(0.5 + random.random())  @asyncio.coroutine def consume():     while True:         value = yield from q.get()         print("Consumed", value)  # do something here to start the coroutines. asyncio.Task()?   loop = asyncio.get_event_loop() loop.run_forever() 

I've tried variations on using coroutines, not using them, wrapping stuff in Tasks, trying to make them create or return futures, etc.

I'm starting to think that I have the wrong idea about how I should be using asyncio (maybe this pattern should be implemented in a different way that I'm not aware of). Any pointers would be appreciated.

like image 759
Seth Avatar asked May 26 '14 06:05

Seth


People also ask

Is Asyncio better than threading?

One of the cool advantages of asyncio is that it scales far better than threading . Each task takes far fewer resources and less time to create than a thread, so creating and running more of them works well. This example just creates a separate task for each site to download, which works out quite well.

Is Asyncio queue thread-safe?

Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio. wait_for() function to do queue operations with a timeout. See also the Examples section below.

Does Asyncio use multiple threads?

In asyncio's model of concurrency we have only one thread executing Python at any given time.


1 Answers

Yes, exactly. Tasks are your friends:

import asyncio, random  q = asyncio.Queue()  @asyncio.coroutine def produce():     while True:         yield from q.put(random.random())         yield from asyncio.sleep(0.5 + random.random())  @asyncio.coroutine def consume():     while True:         value = yield from q.get()         print("Consumed", value)   loop = asyncio.get_event_loop() loop.create_task(produce()) loop.create_task(consume()) loop.run_forever() 

asyncio.ensure_future can be used for task creation also.

And please keep in mind: q.put() is a coroutine, so you should to use yield from q.put(value).

UPD

Switched from asyncio.Task()/asyncio.async() to new brand API loop.create_task() and asyncio.ensure_future() in example.

like image 192
Andrew Svetlov Avatar answered Sep 28 '22 05:09

Andrew Svetlov