Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Websockets in Synchronous Program

I have a bog-standard synchronous python program that needs to be able to read data from websockets and update the GUI with the data. However, asyncio creep is constantly tripping me up.

How do I make a module that:

  1. accepts multiple subscriptions to multiple sources
  2. sends an update to the requester whenever there's data
  3. opens exactly one websocket connection per URL
  4. resets the websocket if it closes

Here's what I have already, but it's failing at many points:

  1. run_forever() means that the loop gets stuck before the subscription completes and then handle() is stuck in the falsey while loop
  2. it does not seem to want to restart sockets when they're down because a websockets object does not have a connected property (websocket without an s does, but I'm not clear on the differences and can't find info online either)
  3. I'm absolutely not sure if my approach is remotely correct.

Been fighting with this for weeks. Would appreciate some pointers.

class WSClient():
    subscriptions = set()
    connections = {}
    started = False

    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def start(self):
        self.started = True
        self.loop.run_until_complete(self.handle())
        self.loop.run_until_forever()  # problematic, because it does not allow new subscribe() events

    async def handle(self):
        while len(self.connections) > 0:
            # listen to every websocket
            futures = [self.listen(self.connections[url]) for url in self.connections]
            done, pending = await asyncio.wait(futures)

            # the following is apparently necessary to avoid warnings
            # about non-retrieved exceptions etc
            try:
                data, ws = done.pop().result()
            except Exception as e:
                print("OTHER EXCEPTION", e)

            for task in pending:
                task.cancel()

    async def listen(self, ws):
        try:
            async for data in ws:
                data = json.loads(data)
                # call the subscriber (listener) back when there's data
                [s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
        except Exception as e:
            print('ERROR LISTENING; RESTARTING SOCKET', e)
            await asyncio.sleep(2)
            self.restart_socket(ws)

    def subscribe(self, subscription):
        task = self.loop.create_task(self._subscribe(subscription))
        asyncio.gather(task)

        if not self.started:
            self.start()

    async def _subscribe(self, subscription):
        try:
            ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
            await ws.send(json.dumps(subscription.sub_msg))

            subscription.ws = ws
            self.connections[subscription.url] = ws
            self.subscriptions.add(subscription)
        except Exception as e:
            print("ERROR SUBSCRIBING; RETRYING", e)
            await asyncio.sleep(2)
            self.subscribe(subscription)

    def restart_socket(self, ws):
        for s in self.subscriptions:
            if s.ws == ws and not s.ws.connected:
                print(s)
                del self.connections[s.url]
                self.subscribe(s)
like image 239
bluppfisk Avatar asked Nov 17 '22 08:11

bluppfisk


1 Answers

I have a bog-standard synchronous python program that needs to be able to read data from websockets and update the GUI with the data. However, asyncio creep is constantly tripping me up.

As you mentioned GUI, then it is probably not a "bog-standard synchronous python program". Usually a GUI program has a non-blocking event-driven main thread, which allows concurrent user behaviors and callbacks. That is very much similar to asyncio, and it is usually a common way for asyncio to work together with GUIs to use GUI-specific event loop to replace default event loop in asyncio, so that your asyncio coroutines just run in GUI event loop and you can avoid calling run_forever() blocking everything.

An alternative way is to run asyncio event loop in a separate thread, so that your program could at the same time wait for websocket data and wait for user clicks. I've rewritten your code as follows:

import asyncio
import threading
import websockets
import json


class WSClient(threading.Thread):

    def __init__(self):
        super().__init__()
        self._loop = None
        self._tasks = {}
        self._stop_event = None

    def run(self):
        self._loop = asyncio.new_event_loop()
        self._stop_event = asyncio.Event(loop=self._loop)
        try:
            self._loop.run_until_complete(self._stop_event.wait())
            self._loop.run_until_complete(self._clean())
        finally:
            self._loop.close()

    def stop(self):
        self._loop.call_soon_threadsafe(self._stop_event.set)

    def subscribe(self, url, sub_msg, callback):
        def _subscribe():
            if url not in self._tasks:
                task = self._loop.create_task(
                    self._listen(url, sub_msg, callback))
                self._tasks[url] = task

        self._loop.call_soon_threadsafe(_subscribe)

    def unsubscribe(self, url):
        def _unsubscribe():
            task = self._tasks.pop(url, None)
            if task is not None:
                task.cancel()

        self._loop.call_soon_threadsafe(_unsubscribe)

    async def _listen(self, url, sub_msg, callback):
        try:
            while not self._stop_event.is_set():
                try:
                    ws = await websockets.connect(url, loop=self._loop)
                    await ws.send(json.dumps(sub_msg))
                    async for data in ws:
                        data = json.loads(data)

                        # NOTE: please make sure that `callback` won't block,
                        # and it is allowed to update GUI from threads.
                        # If not, you'll need to find a way to call it from
                        # main/GUI thread (similar to `call_soon_threadsafe`)
                        callback(data)
                except Exception as e:
                    print('ERROR; RESTARTING SOCKET IN 2 SECONDS', e)
                    await asyncio.sleep(2, loop=self._loop)
        finally:
            self._tasks.pop(url, None)

    async def _clean(self):
        for task in self._tasks.values():
            task.cancel()
        await asyncio.gather(*self._tasks.values(), loop=self._loop)
like image 72
Fantix King Avatar answered Dec 26 '22 22:12

Fantix King