I have a bog-standard synchronous python program that needs to be able to read data from websockets and update the GUI with the data. However, asyncio creep is constantly tripping me up.
How do I make a module that:
Here's what I have already, but it's failing at many points:
run_forever()
means that the loop gets stuck before the subscription completes and then handle()
is stuck in the falsey while
loopconnected
property (websocket without an s does, but I'm not clear on the differences and can't find info online either)Been fighting with this for weeks. Would appreciate some pointers.
class WSClient():
subscriptions = set()
connections = {}
started = False
def __init__(self):
self.loop = asyncio.get_event_loop()
def start(self):
self.started = True
self.loop.run_until_complete(self.handle())
self.loop.run_until_forever() # problematic, because it does not allow new subscribe() events
async def handle(self):
while len(self.connections) > 0:
# listen to every websocket
futures = [self.listen(self.connections[url]) for url in self.connections]
done, pending = await asyncio.wait(futures)
# the following is apparently necessary to avoid warnings
# about non-retrieved exceptions etc
try:
data, ws = done.pop().result()
except Exception as e:
print("OTHER EXCEPTION", e)
for task in pending:
task.cancel()
async def listen(self, ws):
try:
async for data in ws:
data = json.loads(data)
# call the subscriber (listener) back when there's data
[s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
except Exception as e:
print('ERROR LISTENING; RESTARTING SOCKET', e)
await asyncio.sleep(2)
self.restart_socket(ws)
def subscribe(self, subscription):
task = self.loop.create_task(self._subscribe(subscription))
asyncio.gather(task)
if not self.started:
self.start()
async def _subscribe(self, subscription):
try:
ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
await ws.send(json.dumps(subscription.sub_msg))
subscription.ws = ws
self.connections[subscription.url] = ws
self.subscriptions.add(subscription)
except Exception as e:
print("ERROR SUBSCRIBING; RETRYING", e)
await asyncio.sleep(2)
self.subscribe(subscription)
def restart_socket(self, ws):
for s in self.subscriptions:
if s.ws == ws and not s.ws.connected:
print(s)
del self.connections[s.url]
self.subscribe(s)
I have a bog-standard synchronous python program that needs to be able to read data from websockets and update the GUI with the data. However, asyncio creep is constantly tripping me up.
As you mentioned GUI, then it is probably not a "bog-standard synchronous python program". Usually a GUI program has a non-blocking event-driven main thread, which allows concurrent user behaviors and callbacks. That is very much similar to asyncio, and it is usually a common way for asyncio to work together with GUIs to use GUI-specific event loop to replace default event loop in asyncio, so that your asyncio coroutines just run in GUI event loop and you can avoid calling run_forever()
blocking everything.
An alternative way is to run asyncio event loop in a separate thread, so that your program could at the same time wait for websocket data and wait for user clicks. I've rewritten your code as follows:
import asyncio
import threading
import websockets
import json
class WSClient(threading.Thread):
def __init__(self):
super().__init__()
self._loop = None
self._tasks = {}
self._stop_event = None
def run(self):
self._loop = asyncio.new_event_loop()
self._stop_event = asyncio.Event(loop=self._loop)
try:
self._loop.run_until_complete(self._stop_event.wait())
self._loop.run_until_complete(self._clean())
finally:
self._loop.close()
def stop(self):
self._loop.call_soon_threadsafe(self._stop_event.set)
def subscribe(self, url, sub_msg, callback):
def _subscribe():
if url not in self._tasks:
task = self._loop.create_task(
self._listen(url, sub_msg, callback))
self._tasks[url] = task
self._loop.call_soon_threadsafe(_subscribe)
def unsubscribe(self, url):
def _unsubscribe():
task = self._tasks.pop(url, None)
if task is not None:
task.cancel()
self._loop.call_soon_threadsafe(_unsubscribe)
async def _listen(self, url, sub_msg, callback):
try:
while not self._stop_event.is_set():
try:
ws = await websockets.connect(url, loop=self._loop)
await ws.send(json.dumps(sub_msg))
async for data in ws:
data = json.loads(data)
# NOTE: please make sure that `callback` won't block,
# and it is allowed to update GUI from threads.
# If not, you'll need to find a way to call it from
# main/GUI thread (similar to `call_soon_threadsafe`)
callback(data)
except Exception as e:
print('ERROR; RESTARTING SOCKET IN 2 SECONDS', e)
await asyncio.sleep(2, loop=self._loop)
finally:
self._tasks.pop(url, None)
async def _clean(self):
for task in self._tasks.values():
task.cancel()
await asyncio.gather(*self._tasks.values(), loop=self._loop)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With