Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio/aiohttp - How to make series of async - but dependent - requests?

I have a sequence of pairs of async requests in which a pair consists of Request A and Request B. Additionally, Request B is dependent on Request A. In other words, I need to pass data from Response A into Request B. Therefore, I need to schedule tasks such that each task sends Request A, then sends Request B only after Response A has returned.

from aiohttp import ClientSession
from typing import *
import asyncio

async def request_A(url: str, session: ClientSession) -> dict:
    async with session.request('get', url) as response:
        return await response.json()

async def request_B(url: str, data: dict, session: ClientSession) -> dict:
    async with session.request('post', url, json=data) as response:
        return await response.json()

async def request_chain(url_A: str, url_B: str, session: ClientSession) -> dict:
    response_A_data = await request_A(url_A, session)
    response_B_data = await request_B(url_B, response_A_data, session)
    return response_B_data

async def schedule(url_chains: List[Tuple[str, str]]) -> list:
    tasks = []
    async with ClientSession() as session:
        for url_chain in url_chains:
            url_A, url_B = url_chain
            task = asyncio.create_task(request_chain(url_A, url_B, session))
            tasks.append(task)
        return await asyncio.gather(*tasks)

def run_tasks(url_chains: List[Tuple[str, str]]) -> list:
    return asyncio.run(schedule(url_chains))

Now, my question: Per each task consisting of a pair of requests, is Request A guaranteed to return before Request B is sent? Please explain. I am concerned that within the task, while Request A is being awaited, Request B may execute.

If not, how can I keep the tasks async and non-blocking, but also ensure that within the task, Request A blocks execution of Request B until Response A has returned?

I understand that I can run all Request A calls in a batch, then run all Request B calls in a batch, but for reasons specific to my use case, I need to run a batch of all (Request A, Request B) pairs.

like image 469
Andrew Avatar asked Nov 07 '19 22:11

Andrew


1 Answers

Per each task consisting of a pair of requests, is Request A guaranteed to return before Request B is sent?

Yes, the advantage of the async/await pattern is that you don't have to ask yourself this question, consecutives lines of code will always be executed sequentially (but not necessarily consecutively). Here your function request_chain guarantees that request_A will always be executed before request_B.

while Request A is being awaited, Request B may execute

That won't happen, that's basically what await means : hang on until request A has returned before going any further. In other words, await has no impact on execution order. It just hands the control so hiddle time can be used by someone else (in your case, any code from another (A, B) request pair). That's essentially why consecutives lines of code are not necessarily executed consecutively, handing control to some other coroutine (the someone else we just mentioned) using await allow this coroutine to execute code between A and B.

Even if that's a bit inaccurate, you can remember that: The only code that will be executed in parallel is the one you schedule yourself (in this case using asyncio.gather, scheduling several (A, B) pairs to be executed in parallel).

I understand that I can run all Request A calls in a batch, then run all Request B calls in a batch, but for reasons specific to my use case, I need to run a batch of all ...

In this particular case, even if you could run a batch of A's then a batch of B's, I think your solution would be better as it highlights in a simpler way the relation between A and B.

Here is a sample of code that you can run to try things out (it does the same as what you do here with a public math API), it simply compute "x*2+2" in two steps, first "*2" (equivalent of request A), then "+2" (equivalent of request B):

MATH_API_URL = "http://api.mathjs.org/v4"

from aiohttp import ClientSession
import asyncio

async def maths(session, url, expression):
    params = {"expr" : expression}
    print(f"\t> computing {expression}")
    async with session.get(url, params=params) as response:
        result = await response.text()
        print(f"\t< {expression} = {result}")
        return result

async def twice(session, x):
    return await maths(session, MATH_API_URL, f"2 * {x}")

async def plus_two(session, x):
    return await maths(session, MATH_API_URL, f"2 + {x}")

async def twice_plus_two(session, x):
    twice_x = await twice(session, x)
    return await plus_two(session, twice_x)

async def main(inputs):
    async with ClientSession() as session:
        return await asyncio.gather(*(twice_plus_two(session, x) for x in inputs))

inputs = list(range(3))
print([x*2+2 for x in inputs])
print(asyncio.run(main(inputs)))

This code outputs the order in which requests are scheduled :

[2, 4, 6]    
    > computing 2 * 0    
    > computing 2 * 1    
    > computing 2 * 2    
    < 2 * 1 = 2    
    > computing 2 + 2
    < 2 * 0 = 0    
    > computing 2 + 0
    < 2 * 2 = 4    
    > computing 2 + 4
    < 2 + 2 = 4    
    < 2 + 4 = 6
    < 2 + 0 = 2
['2', '4', '6']

See how the "+2" are scheduled as soon as the "*2" have returned.

like image 196
cglacet Avatar answered Oct 22 '22 15:10

cglacet