Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write your own async/awaitable coroutine function in Python?

I'm trying to write my own awaiatbale function which could use in asyncio loop such as asyncio.sleep() method or something like these pre-awaitable implemented methods.

Here is what I've done so far:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(async_wrapper())]
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)

loop.close()

What I got as a result:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1

What I expect as a result:

1
10
2
20
3
30
.
.
.

[NOTE]:

  • I'm not looking for a multithread or multiprocess method.
  • This Question is almost similar to my question which has not resolved yet.
  • I'm using Python3.6
like image 466
Benyamin Jafari Avatar asked Oct 07 '19 13:10

Benyamin Jafari


People also ask

How do you write async function in Python?

To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.

How do you make a function Awaitable in Python?

In python 3.5. 1 one can make use of await/async, however, to use it (as I undestand), you need to have awaitable object. An awaitable object is an object that defines __await__() method returning an iterator.

How do I use async await in Python?

An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.

Is there async await in Python?

Python's asyncio package (introduced in Python 3.4) and its two keywords, async and await , serve different purposes but come together to help you declare, build, execute, and manage asynchronous code.

How to await from other coroutines in Python?

Python coroutines are awaitables and therefore can be awaited from other coroutines: import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested ()".

How to use async to launch a coroutine that returns a value?

You can use async to launch a coroutine that returns a Deferred value which can be accessed by calling await. From a simplistic perspective, you can treat it like a CompletableFuture to execute an operation on a separate thread and retrieve the result when complete. This will let you improve the performance of your application in some situations.

What are the types of awaitable objects in Python?

There are three main types of awaitable objects: coroutines, Tasks, and Futures. Coroutines. Python coroutines are awaitables and therefore can be awaited from other coroutines: import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()".

What happens when you call a coroutine in asyncio?

Calling either function would not actually run, but a coroutine object is returned. In some cases, if needed to determine the function is a coroutine or not, asyncio has a method asyncio.iscoroutinefunction (func).


2 Answers

I found a concurrency/asynchronous approach using generators. However, it's not an asyncio approach:

from collections import deque

def coro1():
    for i in range(1, 5):
        yield i

def coro2():
    for i in range(1, 5):
        yield i*10

print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.pop(0)
    try:
        print(next(task))
        tasks.append(task)
    except StopIteration:
        pass

print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.popleft()  # select and remove a task (coro1/coro2).
    try:
        print(next(task))
        tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
    except StopIteration:
        pass

Out:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40

[UPDATE]:

Finally, I've solved this example through asyncio syntax:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

loop = asyncio.get_event_loop()
futures = [
    asyncio.ensure_future(coro1()),
    asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()

Out:

1
10
2
20
3
30
4
40
5
50

And another concurrency coroutine approach via async-await expression and an event-loop manager based on Heap queue algorithm, without using asyncio library and its event-loop as well as without using asyncio.sleep() method:

import heapq
from time import sleep
from datetime import datetime, timedelta

class Sleep:
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until

async def coro1():
    for i in range(1, 6):
        await Sleep(0)
        print(i)

async def coro2():
    for i in range(1, 6):
        await Sleep(0)
        print(i * 10)

def coro_manager(*coros):
    coros = [(datetime.now(), coro) for coro in coros]
    heapq.heapify(coros)
    while coros:
        exec_at, coro = heapq.heappop(coros)
        if exec_at > datetime.now():
            sleep((exec_at - datetime.now()).total_seconds())
        try:
            heapq.heappush(coros, (coro.send(None), coro))
        except StopIteration:
            try:
                coros.remove(coro)
            except ValueError:
                pass

coro_manager(coro1(), coro2())

Out:

1
10
2
20
3
30
4
40
5
50
like image 169
Benyamin Jafari Avatar answered Oct 16 '22 14:10

Benyamin Jafari


Usually you don't need to write low-level coroutines, using async def and awaiting inside it is a common way to achieve your goal.

However if you interested in implementation details here's source code of asyncio.sleep().

Similar to many other low-level asyncio functions it uses 3 main things to implement coroutine:

  • asyncio.Future() - "a bridge" between callbacks-world and coroutines-world
  • event loop's loop.call_later() method - on of several event loop's methods that tells directly to event loop when to do something
  • async def and await - just a syntax sugar for @asyncio.coroutine and yield from that allows to cast some function to generator (and execute it "one step at the time")

Here's my rough implementation of sleep that shows the idea:

import asyncio


# @asyncio.coroutine - for better tracebacks and edge cases, we can avoid it here
def my_sleep(delay):
    fut = asyncio.Future()

    loop = asyncio.get_event_loop()
    loop.call_later(
        delay,
        lambda *_: fut.set_result(True)
    )

    res = yield from fut
    return res


# Test:
@asyncio.coroutine
def main():
    yield from my_sleep(3)
    print('ok')


asyncio.run(main())

If you want to go lower than this you'll have to comprehend how generators (or coroutines) being managed by event loop. Video mentioned by user4815162342 - is a good place to start.

But again, all above - are details of implementation. You don't have to think about all this stuff unless you write something very-very low-level.

like image 1
Mikhail Gerasimov Avatar answered Oct 16 '22 12:10

Mikhail Gerasimov