Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio: running task only if all other tasks are awaiting

I am currently running some endless tasks using asyncio.wait

I need a special function to run when all the others are on await

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())

How can I get the special_function to only be run when all main_tasks are on await?


Edit:

What I mean by "all main_tasks are on await": all main_tasks are not ready to continue, e.g. are in asyncio.sleep(100) or I/O bound and still waiting for data.

Therefore the main_tasks cannot continue and the event loop runs the special_function while the tasks are in this state, NOT every iteration of the event loop.


Edit 2:

My use case:

The main_tasks are updating a data structure with new data from web-sockets.

The special_function transfers that data to another process upon an update signal from that process. (multiprocessing with shared variables and data structures)

It needs to be the most up to date data it can be when it transfers, there cannot be pending updates from main_tasks.

This is why I only want to run special_function when there are no main_tasks with new data available to be processed. (i.e. all waiting on await)

like image 566
Zak Stucke Avatar asked May 26 '19 00:05

Zak Stucke


People also ask

How many times should Asyncio run () be called?

How many times should Asyncio run () be called? It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

What does await do in Asyncio?

The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.

When should I use async await Python?

They are generally used for cooperative tasks and behave like Python generators. An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop.


2 Answers

I tried to write a test for the 'task not ready to run' condition. I think asyncio does not expose details from the scheduler. The developers have clearly stated they want to keep freedom for changing asyncio internals without breaking backward compatibility.

In asyncio.Task there is this comment (note: _step() runs the task coroutine till the next await):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.

But that internal variable is not in the API, of course.

You can get some limited access to _fut_waiter by reading the output of repr(task), but the format seems to be not reliable either, so I would not depend on somehing like this:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()

Anyway, I think you are trying to be too perfect. You want to know if there is new data in other tasks. What if the data is in asyncio buffers? Kernel buffer? Network card receive buffer? ... You could never know if new data arrives the next millisecond.

My suggestion: write all updates to a single queue. Check that queue as the only source of updates. If the queue is empty, publish the last state.

like image 114
VPfB Avatar answered Sep 17 '22 13:09

VPfB


This is what I'd do:

  1. I'd not use your special function.

  2. Each data update needs a separate generation ID (4 byte integer), and I'd only put in the ID in shared memory.

Both processes are running independently, I assume.

  1. The subscriber keeps the generation ID as local. When it notices the generation ID is changed in shared memory, then the read new data from the file.

  2. Data is stored on tmpfs (/tmp) so it's on memory. You can create your own tmpfs if suited. It's fast.

Here is why:

  • To make sure the subscriber doesn't fetch half-baked data in shared memory, it has to be protected by semaphore. It's a PITA
  • By using file, you can carry variable size data. This may not apply to you. One of hard things to solve when using shared memory is to have enough space but not waste space. Using file solves this problem.
  • By using 4-byte int generation ID, updating ID is atomic. This is a huge advantage.

So, as one of your tasks receives new data, open a file, write to it, and after closing the file descriptor, you write out the generation ID to shared memory. Before updating generation ID, you can delete the file safely. The subscriber - if it has opened file, it will complete reading the file, and if it tries to open it, it fails to open so it has to wait for the next generation anyway. If machine crashes, /tmp is gone so you don't need to worry about cleaning up files. You can even write a new task which solo job is to delete files in /tmp that is older generations if you like.

like image 25
Naoyuki Tai Avatar answered Sep 19 '22 13:09

Naoyuki Tai