I'm working on a Python-3 program that is trying to do two things: (1) Read data (Type 1) from an external websocket (non-blocking) and (2) Receive data (Type 2) on a regular UDP socket (non-blocking)
There are long periods of time where there is no data on neither the websocket nor the UDP socket. As such, I'm attempting to make both reads/receives for both data types non-blocking. I'm attempting to use Asyncio and Websockets to perform this for the websocket.
Unfortunately, the code below hangs whenever there is not data (Type 1) on the websocket. It blocks the rest of the code from executing. What am I doing wrong?
Thanks in advance for all the help.
import asyncio
import websockets
import socket
IP_STRATUX = "ws://192.168.86.201/traffic"
# Method to retrieve data (Type 1) from websocket
async def readWsStratux(inURL):
async with websockets.connect(inURL, close_timeout=0.1) as websocket:
try:
data = await websocket.recv()
return data
except websocket.error:
return None
if __name__ == "__main__":
# Socket to receive data (Type 2)
sockPCC = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sockPCC.bind((PCC_IP, PCC_PORT))
sockPCC.setblocking(0)
while True:
print('-----MAIN LOOP-----')
data1 = asyncio.get_event_loop().run_until_complete(
readWsStratux(IP_STRATUX))
print(f'Data 1: {data1}')
data2, addr = sockPCC.recvfrom(1024)
print(f'Data 2: {data2}')
The problem is that run_until_complete
does what it says, which is runs the provided coroutine until it returns. You need to create a coroutine that spawns two independent tasks, each running a coroutine of its own "in the background". One task will handle reading from the websocket, and the other the UDP data. Both coroutines can feed a queue which your main coroutine reads off of.
The websocket coroutine will look very similar to what you already have, but with the infinite loop pushed into the coroutine, and the data transmitted into a queue provided by the caller:
async def readWsStratux(inURL, queue):
while True:
async with websockets.connect(inURL, close_timeout=0.1) as ws:
try:
data = await ws.recv()
await queue.put(('websocket', data))
except websockets.error:
return None
Next, you will want a similar coroutine doing UDP. Instead of manually creating a non-blocking socket, it's a good idea to use asyncio's support for UDP. You can start with a simplified version of the example class from the docs:
class ClientProtocol:
def __init__(self, queue):
self.queue = queue
def datagram_received(self, data, addr):
self.queue.put_nowait(('udp', data))
def connection_lost(self, exc):
self.queue.put_nowait(('udp', b''))
async def read_udp(queue):
transport, protocol = await loop.create_datagram_endpoint(
lambda: ClientProtocol(queue),
remote_addr=(PCC_IP, PCC_PORT))
# wait until canceled
try:
await asyncio.get_event_loop().create_future()
except asyncio.CancelledError:
transport.close()
raise
With those two in place, you can write the main coroutine that spawns them and collects the data from the queue while they are running:
async def read_both(in_url):
queue = asyncio.Queue()
# spawn two workers in parallel, and have them send
# data to our queue
ws_task = asyncio.create_task(readWsStratux(in_url, queue))
udp_task = asyncio.create_task(read_udp(queue))
while True:
source, data = await queue.get()
if source == 'ws':
print('from websocket', data)
elif source == 'udp':
if data == b'':
break # lost the UDP connection
print('from UDP', data)
# terminate the workers
ws_task.cancel()
udp_task.cancel()
Your main program now consists of a trivial invocation of read_both
:
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(read_both(IP_STRATUX))
Please note that the above code is untested and may contain typos.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With