Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python asyncio - RuntimeError: await wasn't used with future

I want to use a semaphore with a gather() to limit api calls. I think I have to use create_task() but I obtain a runtime error: "RuntimeError: await wasn't used with future". How can I fix it?

Here is the code:

import asyncio
# pip install git+https://github.com/sammchardy/python-binance.git@00dc9a978590e79d4aa02e6c75106a3632990e8d
from binance import AsyncClient


async def catch_up_aggtrades(client, symbols):
    tasks = asyncio.create_task(get_historical_aggtrades(client, symbol) for symbol in symbols)
    sem = asyncio.Semaphore(1)
    async with sem:
        await asyncio.gather(*tasks)


async def get_historical_aggtrades(client, symbol):
    async for trade in client.aggregate_trade_iter(symbol, '1 day ago UTC'):
        print(f"symbol {symbol}")


async def main():
    client = await AsyncClient.create()
    symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
    await catch_up_aggtrades(client, symbols)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
like image 747
lovelace63 Avatar asked May 13 '20 14:05

lovelace63


1 Answers

A sempahore limiting a resource usage is actually a very simple concept. It is similar to counting free parking lots. (-1 when a car enters, +1 when it leaves). When the counter drops to zero, a queue of waiting cars starts to build.

That means:

  • one semaphore per resource
  • initial value = upper limit of concurrent resource users
  • each resource usage is guarded by async with sem:

The existing code:

sem = asyncio.Semaphore(1)
async with sem:
    await asyncio.gather(*tasks)

limits the use of asyncio.gather to 1 task gathering at a time. It does not limit the tasks, just their gathering. Since the gather is called just once anyway, the semaphore does not change anything.

Your program might be changed to (including the issue resolved in comments):

LIMIT = 1

async def catch_up_aggtrades(client, symbols):
    sem = asyncio.Semaphore(LIMIT)
    tasks = [asyncio.create_task(get_historical_aggtrades(client, symbol, sem)) for symbol in symbols]
    await asyncio.gather(*tasks)

async def get_historical_aggtrades(client, symbol, sem):
    async with sem:
        async for trade in client.aggregate_trade_iter(symbol, '1 day ago UTC'):
            print(f"symbol {symbol}")
like image 142
VPfB Avatar answered Nov 17 '22 17:11

VPfB