Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python event handler with Async (non-blocking while loop)

import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Using Python 3.6.

I'm trying to write an event handler that constantly listens for messages in the queue, and processes them (prints them in this case). But it must be asynchronous - I need to be able to run it in a terminal (IPython) and manually feed things to the queue (at least initially, for testing).

This code does not work - it blocks forever.

How do I make this run forever but return control after each iteration of the while loop?

Thanks.

side note: To make the event loop work with IPython (version 7.2), I'm using this code from the ib_insync library, I'm using this library for the real-world problem in the example above.

like image 242
Josh D Avatar asked Mar 13 '19 21:03

Josh D


People also ask

Does async await block event loop Python?

Many database libraries in Python do not work with async yet. Using them in your async program will block the event loop; instead, use a ThreadPool executor to wrap them.

How do you stop a loop in Python event?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.

What is Run_in_executor Python?

run_in_executor() method can be used with a concurrent. futures. ThreadPoolExecutor to execute blocking code in a different OS thread without blocking the OS thread that the event loop runs in.

How does Python handle asynchronous calls?

To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.

What is event loop in asyncio?

Event Loop¶. The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods.

What is event loop and asynchronous non blocking in Node JS?

In the previous article, we talked about Node js network programming. Here we will take it further and talk about Event loop and asynchronous non blocking in node js. Node.js is single threaded. It supports concurrency through paradigms of event and callbacks. Since it is single threaded, most APIs provided by Node.js core are asynchronous.

How do you cancel a callback in async event loop?

Event loop uses monotonic clocks to track time. Schedule callback to be called after the given delay number of seconds (can be either an int or a float). An instance of asyncio.TimerHandle is returned which can be used to cancel the callback.

What is get_running_loop in asyncio?

asyncio. get_running_loop () ¶ Return the running event loop in the current OS thread. If there is no running event loop a RuntimeError is raised. This function can only be called from a coroutine or a callback.


1 Answers

You need to make your queue an asyncio.Queue, and add things to the queue in a thread-safe manner. For example:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Your other thread must put stuff in the queue like this:

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe will ensure correct locking, and also that the event loop is woken up when a new queue item is ready.

like image 98
user4815162342 Avatar answered Oct 24 '22 17:10

user4815162342