Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non-Blocking Websocket Receive with Asyncio

I'm working on a Python-3 program that is trying to do two things: (1) Read data (Type 1) from an external websocket (non-blocking) and (2) Receive data (Type 2) on a regular UDP socket (non-blocking)

There are long periods of time where there is no data on neither the websocket nor the UDP socket. As such, I'm attempting to make both reads/receives for both data types non-blocking. I'm attempting to use Asyncio and Websockets to perform this for the websocket.

Unfortunately, the code below hangs whenever there is not data (Type 1) on the websocket. It blocks the rest of the code from executing. What am I doing wrong?

Thanks in advance for all the help.

import asyncio
import websockets
import socket

IP_STRATUX =    "ws://192.168.86.201/traffic"

# Method to retrieve data (Type 1) from websocket
async def readWsStratux(inURL):
    async with websockets.connect(inURL, close_timeout=0.1) as websocket:
        try:
            data = await websocket.recv()
            return data
        except websocket.error:
            return None

if __name__ == "__main__":
    # Socket to receive data (Type 2)
    sockPCC = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sockPCC.bind((PCC_IP, PCC_PORT))
    sockPCC.setblocking(0)

    while True:
        print('-----MAIN LOOP-----')
        data1 = asyncio.get_event_loop().run_until_complete(
            readWsStratux(IP_STRATUX))
        print(f'Data 1: {data1}')
        data2, addr = sockPCC.recvfrom(1024)
        print(f'Data 2: {data2}')
like image 757
Jordan H. Avatar asked Sep 14 '19 23:09

Jordan H.


1 Answers

The problem is that run_until_complete does what it says, which is runs the provided coroutine until it returns. You need to create a coroutine that spawns two independent tasks, each running a coroutine of its own "in the background". One task will handle reading from the websocket, and the other the UDP data. Both coroutines can feed a queue which your main coroutine reads off of.

The websocket coroutine will look very similar to what you already have, but with the infinite loop pushed into the coroutine, and the data transmitted into a queue provided by the caller:

async def readWsStratux(inURL, queue):
    while True:
        async with websockets.connect(inURL, close_timeout=0.1) as ws:
            try:
                data = await ws.recv()
                await queue.put(('websocket', data))
            except websockets.error:
                return None

Next, you will want a similar coroutine doing UDP. Instead of manually creating a non-blocking socket, it's a good idea to use asyncio's support for UDP. You can start with a simplified version of the example class from the docs:

class ClientProtocol:
    def __init__(self, queue):
        self.queue = queue

    def datagram_received(self, data, addr):
        self.queue.put_nowait(('udp', data))

    def connection_lost(self, exc):
        self.queue.put_nowait(('udp', b''))

async def read_udp(queue):
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: ClientProtocol(queue),
        remote_addr=(PCC_IP, PCC_PORT))
    # wait until canceled
    try:
        await asyncio.get_event_loop().create_future()
    except asyncio.CancelledError:
        transport.close()
        raise

With those two in place, you can write the main coroutine that spawns them and collects the data from the queue while they are running:

async def read_both(in_url):
    queue = asyncio.Queue()
    # spawn two workers in parallel, and have them send
    # data to our queue
    ws_task = asyncio.create_task(readWsStratux(in_url, queue))
    udp_task = asyncio.create_task(read_udp(queue))

    while True:
        source, data = await queue.get()
        if source == 'ws':
            print('from websocket', data)
        elif source == 'udp':
            if data == b'':
                break  # lost the UDP connection
            print('from UDP', data)

    # terminate the workers
    ws_task.cancel()
    udp_task.cancel()

Your main program now consists of a trivial invocation of read_both:

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(read_both(IP_STRATUX))

Please note that the above code is untested and may contain typos.

like image 139
user4815162342 Avatar answered Nov 15 '22 23:11

user4815162342