Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python asyncio asynchronously fetch data by key from a dict when the key becomes available

Tags:

Like title told, my use case is like this:

I have one aiohttp server, which accept request from client, when i have the request i generate one unique request id for it, and then i send the {req_id: req_pyaload} dict to some workers (the worker is not in python thus running in another process), when the workers complete the work, i get back the response and put them in a result dict like this: {req_id_1: res_1, req_id_2: res_2}.

Then I want my aiohttp server handler to await on above result dict, so when the specific response become available (by req_id) it can send it back.

I build below example code to try to simulate the process, but got stuck in implementing the coroutine async def fetch_correct_res(req_id) which should asynchronously/unblockly fetch the correct response by req_id.

import random
import asyncio
import shortuuid

n_tests = 1000

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id):
    pass

async def handler(req):
    res = await fetch_correct_res(req)
    assert req == res, "the correct res for the req should exactly be the req itself."
    print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    for _ in range(n_tests):
        random_idx = random.choice(idxs)
        await asyncio.sleep(random_idx / 1000)
        res_dict[req_ids[random_idx]] = req_ids[random_idx]
        print("req: {} is back".format(req_ids[random_idx]))

So:

  1. Is it possible to make this solution work? how?

  2. If above solution is not possible, what should be the correct solution for this use case with asyncio?

Many thanks.


The only approach i can think of for now to make this work is: pre-created some asyncio.Queue with pre-assigned id, then for each incoming request assign one queue to it, so the handler just await on this queue, when the response come back i put it into this pre-assigned queue only, after the request fulfilled, i collect back the queue to use it for next incoming request. Not very elegant, but will solve the problem.

like image 979
lnshi Avatar asked Sep 06 '19 12:09

lnshi


People also ask

Is Asyncio asynchronous?

asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. asyncio is often a perfect fit for IO-bound and high-level structured network code.

What is Asyncio Get_event_loop ()?

asyncio. get_event_loop() Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.

How do you await async function in Python?

An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.

What does await do in Asyncio?

The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.


1 Answers

See if the below sample implementation fulfils your need

basically you want to respond back to the request(id) with your response(unable to predict the order) in an asynchronous way

So at the time of request handling, populate the dict with {request_id: {'event':<async.Event>, 'result': <result>}} and await on asyncio.Event.wait(), once the response is received, signal the event with asyncio.Event.set() which will release the await and then fetch the response from the dict based on the request id

I modified your code slightly to pre-populate the dict with request id and put the await on asyncio.Event.wait() until the signal comes from the response

import random
import asyncio
import shortuuid

n_tests = 10

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id, event):
  await event.wait()
  res = res_dict[req_id]['result']
  return res

async def handler(req, loop):
      print("incoming request id: {}".format(req))
      event = asyncio.Event()
      data = {req :{}}
      res_dict.update(data)
      res_dict[req]['event']=event
      res_dict[req]['result']='pending'
      res = await fetch_correct_res(req, event)
      assert req == res, "the correct res for the req should exactly be the req itself."
      print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    random.shuffle(req_ids)
    for i in req_ids:
        await asyncio.sleep(random.randrange(2,4))
        print("req: {} is back".format(i))
        if res_dict.get(i) is not None:
          event = res_dict[i]['event']
          res_dict[i]['result'] = i
          event.set()  

loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
          handler(req_ids[1], loop),
          handler(req_ids[2], loop),
          handler(req_ids[3], loop),
          randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()

sample response from the above code

incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh
like image 120
Suresh Kumar Avatar answered Nov 01 '22 17:11

Suresh Kumar