Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where do I catch the KeyboardInterrupt exception in this async setup

I am working on a project that uses the ccxt async library which requires all resources used by a certain class to be released with an explicit call to the class's .close() coroutine. I want to exit the program with ctrl+c and await the close coroutine in the exception. However, it is never awaited.

The application consists of the modules harvesters, strategies, traders, broker, and main (plus config and such). The broker initiates the strategies specified for an exchange and executes them. The strategy initiates the associated harvester which collects the necessary data. It also analyses the data and spawns a trader when there is a profitable opportunity. The main module creates a broker for each exchange and runs it. I have tried to catch the exception at each of these levels, but the close routine is never awaited. I'd prefer to catch it in the main module in order to close all exchange instances.

Harvester

async def harvest(self):
    if not self.routes:
        self.routes = await self.get_routes()
    for route in self.routes:
        self.logger.info("Harvesting route {}".format(route))
        await asyncio.sleep(self.exchange.rateLimit / 1000)
        yield await self.harvest_route(route)

Strategy

async def execute(self):
    async for route_dct in self.harvester.harvest():
        self.logger.debug("Route dictionary: {}".format(route_dct))
        await self.try_route(route_dct)

Broker

async def run(self):
    for strategy in self.strategies:
        self.strategies[strategy] = getattr(
            strategies, strategy)(self.share, self.exchange, self.currency)
    while True:
        try:
            await self.execute_strategies()
        except KeyboardInterrupt:
            await safe_exit(self.exchange)

Main

async def main():
    await load_exchanges()
    await load_markets()
    brokers = [Broker(
        share,
        exchanges[id]["api"],
        currency,
        exchanges[id]["strategies"]
        ) for id in exchanges]
    futures = [broker.run() for broker in brokers]
    for future in asyncio.as_completed(futures):
        executed = await future
        return executed


if __name__ == "__main__":
    status = asyncio.run(main())
    sys.exit(status)

I had expected the close() coroutine to be awaited, but I still get an error from the library that I must explicitly call it. Where do I catch the exception so that all exchange instances are closed properly?

like image 665
iuvbio Avatar asked Feb 04 '19 23:02

iuvbio


1 Answers

Somewhere in your code should be entry point, where event loop is started.

Usually it is one of functions below:

loop.run_until_complete(main())

loop.run_forever()

asyncio.run(main())

When ctrl+C happens KeyboardInterrupt can be catched at this line. When it happened to execute some finalizing coroutine you can run event loop again.

This little example shows idea:

import asyncio 


async def main():
    print('Started, press ctrl+C')
    await asyncio.sleep(10)


async def close():
    print('Finalazing...')
    await asyncio.sleep(1)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        loop.run_until_complete(close())
    finally:
        print('Program finished')
like image 82
Mikhail Gerasimov Avatar answered Oct 01 '22 18:10

Mikhail Gerasimov