Is there any way to control the scheduling priority among all coroutines that are ready to run?
Specifically, I have several coroutines handling streaming I/O from the network into several queues, a second set of coroutines that ingest the data from the queues into a data structure. These ingestion coroutines signal a third set of coroutines that analyze that data structure whenever new data is ingested.
Data arrival from the network is an infinite stream with a non-deterministic message rate. I want the analysis step to run as soon as new data arrives, but not before all pending data is processed. The problem I see is that depending on the order of scheduling, an analysis coroutine could run before a reader coroutine that also had data ready, so the analysis coroutine can't even check the ingestion queues for pending data because it may not have been read off the network yet, even though those reader coroutines were ready to run.
One solution might be to structure the coroutines into priority groups so that the reader coroutines would always be scheduled before the analysis coroutines if they were both able to run, but I didn't see a way to do this.
Is there a feature of asyncio that can accomplish this prioritization? Or perhaps I'm asking the wrong question and I can restructure the coroutines such that this can't happen (but I don't see it).
-- edit --
Basically I have a N coroutines that look something like this:
while True:
data = await socket.get()
ingestData(data)
self.event.notify()
So the problem I'm running into is that there's no way for me to know that any of the other N-1 sockets have data ready while executing this coroutine so I can't know whether or not I should notify the event. If I could prioritize these coroutines above the analysis coroutine (which is awaiting self.event.wait()) then I could be sure none of them were runnable when the analysis coroutine is scheduled.
gather() method - It runs awaitable objects (objects which have await keyword) concurrently.
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
ensure_future is a method to create Task from coroutine . It creates tasks in different ways based on argument (including using of create_task for coroutines and future-like objects). create_task is an abstract method of AbstractEventLoop . Different event loops can implement this function different ways.
The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.
asyncio
doesn't support explicitly specifying coroutine priorities, but it is straightforward to achieve the same effect them with the tools provided by the library. Given the example in your question:
async def process_pending():
while True:
data = await socket.get()
ingestData(data)
self.event.notify()
You could await the sockets directly using asyncio.wait
, and then you would know which sockets are actionable, and only notify the analyzers after all have been processed. For example:
def _read_task(self, socket):
loop = asyncio.get_event_loop()
task = loop.create_task(socket.get())
task.__process_socket = socket
return task
async def process_pending_all(self):
tasks = {self._read_task(socket) for socket in self.sockets}
while True:
done, not_done = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
ingestData(task.result())
not_done.add(self._read_task(task.__process_socket))
tasks = not_done
self.event.notify()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With