I am currently running some endless tasks using asyncio.wait
I need a special function to run when all the others are on await
import asyncio
async def special_function():
while True:
# does some work,
# Passes control back to controller to run main_tasks
# if they are no longer waiting.
await asyncio.sleep(0)
async def handler():
tasks = [task() for task in main_tasks]
# Adding the task that I want to run when all main_tasks are awaiting:
tasks.append(special_function())
await asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(handler())
How can I get the special_function
to only be run when all main_tasks
are on await
?
What I mean by "all main_tasks
are on await
": all main_tasks
are not ready to continue, e.g. are in asyncio.sleep(100)
or I/O bound and still waiting for data.
Therefore the main_tasks
cannot continue and the event loop runs the special_function
while the tasks are in this state, NOT every iteration of the event loop.
My use case:
The main_tasks
are updating a data structure with new data from web-sockets.
The special_function
transfers that data to another process upon an update signal from that process. (multiprocessing
with shared variables and data structures)
It needs to be the most up to date data it can be when it transfers, there cannot be pending updates from main_tasks.
This is why I only want to run special_function when there are no main_tasks with new data available to be processed. (i.e. all waiting on await
)
How many times should Asyncio run () be called? It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.
They are generally used for cooperative tasks and behave like Python generators. An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop.
I tried to write a test for the 'task not ready to run' condition. I think asyncio does not expose details from the scheduler. The developers have clearly stated they want to keep freedom for changing asyncio internals without breaking backward compatibility.
In asyncio.Task
there is this comment (note: _step()
runs the task coroutine till the next await):
# An important invariant maintained while a Task not done:
#
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
But that internal variable is not in the API, of course.
You can get some limited access to _fut_waiter
by reading the output of repr(task)
, but the format seems to be not reliable either, so I would not depend on somehing like this:
PENDINGMSG = 'wait_for=<Future pending '
if all(PENDINGMSG in repr(t) for t in monitored_tasks):
do_something()
Anyway, I think you are trying to be too perfect. You want to know if there is new data in other tasks. What if the data is in asyncio buffers? Kernel buffer? Network card receive buffer? ... You could never know if new data arrives the next millisecond.
My suggestion: write all updates to a single queue. Check that queue as the only source of updates. If the queue is empty, publish the last state.
This is what I'd do:
I'd not use your special function.
Each data update needs a separate generation ID (4 byte integer), and I'd only put in the ID in shared memory.
Both processes are running independently, I assume.
The subscriber keeps the generation ID as local. When it notices the generation ID is changed in shared memory, then the read new data from the file.
Data is stored on tmpfs (/tmp) so it's on memory. You can create your own tmpfs if suited. It's fast.
Here is why:
So, as one of your tasks receives new data, open a file, write to it, and after closing the file descriptor, you write out the generation ID to shared memory. Before updating generation ID, you can delete the file safely. The subscriber - if it has opened file, it will complete reading the file, and if it tries to open it, it fails to open so it has to wait for the next generation anyway. If machine crashes, /tmp is gone so you don't need to worry about cleaning up files. You can even write a new task which solo job is to delete files in /tmp that is older generations if you like.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With