I'm trying to write my own awaiatbale function which could use in asyncio loop such as asyncio.sleep()
method or something like these pre-awaitable implemented methods.
Here is what I've done so far:
import asyncio
def coro1():
for i in range(1, 10):
yield i
def coro2():
for i in range(1, 10):
yield i*10
class Coro: # Not used.
def __await__(self):
for i in range(1, 10):
yield i * 100
@asyncio.coroutine
def wrapper1():
return (yield from coro1())
@asyncio.coroutine
def wrapper2():
return (yield from coro2())
for i in wrapper1():
print(i)
print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))
async def async_wrapper():
await wrapper1()
await wrapper2()
loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(async_wrapper())]
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)
loop.close()
1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
File "stack-coroutine.py", line 36, in <module>
result = loop.run_until_complete(asyncio.gather(*futures))
File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "stack-coroutine.py", line 30, in async_wrapper
await wrapper1()
File "stack-coroutine.py", line 18, in wrapper1
return (yield from coro1())
File "stack-coroutine.py", line 5, in coro1
yield i
RuntimeError: Task got bad yield: 1
1
10
2
20
3
30
.
.
.
[NOTE]:
To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.
In python 3.5. 1 one can make use of await/async, however, to use it (as I undestand), you need to have awaitable object. An awaitable object is an object that defines __await__() method returning an iterator.
An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.
Python's asyncio package (introduced in Python 3.4) and its two keywords, async and await , serve different purposes but come together to help you declare, build, execute, and manage asynchronous code.
Python coroutines are awaitables and therefore can be awaited from other coroutines: import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested ()".
You can use async to launch a coroutine that returns a Deferred value which can be accessed by calling await. From a simplistic perspective, you can treat it like a CompletableFuture to execute an operation on a separate thread and retrieve the result when complete. This will let you improve the performance of your application in some situations.
There are three main types of awaitable objects: coroutines, Tasks, and Futures. Coroutines. Python coroutines are awaitables and therefore can be awaited from other coroutines: import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()".
Calling either function would not actually run, but a coroutine object is returned. In some cases, if needed to determine the function is a coroutine or not, asyncio has a method asyncio.iscoroutinefunction (func).
I found a concurrency/asynchronous approach using generators. However, it's not an asyncio
approach:
from collections import deque
def coro1():
for i in range(1, 5):
yield i
def coro2():
for i in range(1, 5):
yield i*10
print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])
while tasks:
task = tasks.pop(0)
try:
print(next(task))
tasks.append(task)
except StopIteration:
pass
print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])
while tasks:
task = tasks.popleft() # select and remove a task (coro1/coro2).
try:
print(next(task))
tasks.append(task) # add the removed task (coro1/coro2) for permutation.
except StopIteration:
pass
Out:
########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40
[UPDATE]:
Finally, I've solved this example through asyncio
syntax:
import asyncio
async def coro1():
for i in range(1, 6):
print(i)
await asyncio.sleep(0) # switches task every one iteration.
async def coro2():
for i in range(1, 6):
print(i * 10)
await asyncio.sleep(0) # switches task every one iteration.
loop = asyncio.get_event_loop()
futures = [
asyncio.ensure_future(coro1()),
asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()
Out:
1
10
2
20
3
30
4
40
5
50
And another concurrency coroutine approach via async-await
expression and an event-loop manager based on Heap queue algorithm, without using asyncio
library and its event-loop as well as without using asyncio.sleep()
method:
import heapq
from time import sleep
from datetime import datetime, timedelta
class Sleep:
def __init__(self, seconds):
self.sleep_until = datetime.now() + timedelta(seconds=seconds)
def __await__(self):
yield self.sleep_until
async def coro1():
for i in range(1, 6):
await Sleep(0)
print(i)
async def coro2():
for i in range(1, 6):
await Sleep(0)
print(i * 10)
def coro_manager(*coros):
coros = [(datetime.now(), coro) for coro in coros]
heapq.heapify(coros)
while coros:
exec_at, coro = heapq.heappop(coros)
if exec_at > datetime.now():
sleep((exec_at - datetime.now()).total_seconds())
try:
heapq.heappush(coros, (coro.send(None), coro))
except StopIteration:
try:
coros.remove(coro)
except ValueError:
pass
coro_manager(coro1(), coro2())
Out:
1
10
2
20
3
30
4
40
5
50
Usually you don't need to write low-level coroutines, using async def
and awaiting inside it is a common way to achieve your goal.
However if you interested in implementation details here's source code of asyncio.sleep()
.
Similar to many other low-level asyncio functions it uses 3 main things to implement coroutine:
loop.call_later()
method - on of several event loop's methods that tells directly to event loop when to do somethingasync def
and await
- just a syntax sugar for @asyncio.coroutine
and yield from
that allows to cast some function to generator (and execute it "one step at the time")Here's my rough implementation of sleep that shows the idea:
import asyncio
# @asyncio.coroutine - for better tracebacks and edge cases, we can avoid it here
def my_sleep(delay):
fut = asyncio.Future()
loop = asyncio.get_event_loop()
loop.call_later(
delay,
lambda *_: fut.set_result(True)
)
res = yield from fut
return res
# Test:
@asyncio.coroutine
def main():
yield from my_sleep(3)
print('ok')
asyncio.run(main())
If you want to go lower than this you'll have to comprehend how generators (or coroutines) being managed by event loop. Video mentioned by user4815162342 - is a good place to start.
But again, all above - are details of implementation. You don't have to think about all this stuff unless you write something very-very low-level.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With