Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

listen to multiple socket with websockets and asyncio

I am trying to create a script in python that listens to multiple sockets using websockets and asyncio, the problem is that no matter what I do it only listen to the first socket I call. I think its the infinite loop, what are my option to solve this? using threads for each sockets?

  async def start_socket(self, event):
    payload = json.dumps(event)
    loop = asyncio.get_event_loop()

    self.tasks.append(loop.create_task(
        self.subscribe(event)))

    # this should not block the rest of the code
    await asyncio.gather(*tasks)


  def test(self):
    # I want to be able to add corotines at a different time
    self.start_socket(event1)
    # some code
    self.start_socket(event2)
like image 713
joseRo Avatar asked Jan 28 '23 07:01

joseRo


2 Answers

Your code appears incomplete, but what you've shown has two issues. One is that run_until_complete accepts a coroutine object (or other kind of future), not a coroutine function. So it should be:

# note parentheses after your_async_function()
asyncio.get_event_loop().run_until_complete(your_async_function())

the problem is that no matter what I do it only listen to the first socket I call. I think its the infinite loop, what are my option to solve this? using threads for each sockets?

The infinite loop is not the problem, asyncio is designed to support such "infinite loops". The problem is that you are trying to do everything in one coroutine, whereas you should be creating one coroutine per websocket. This is not a problem, as coroutines are very lightweight.

For example (untested):

async def subscribe_all(self, payload):
    loop = asyncio.get_event_loop()
    # create a task for each URL
    for url in url_list:
        tasks.append(loop.create_task(self.subscribe_one(url, payload)))
    # run all tasks in parallel
    await asyncio.gather(*tasks)

async def subsribe_one(self, url, payload):
    async with websockets.connect(url) as websocket:
        await websocket.send(payload)
        while True:
            msg = await websocket.recv()
            print(msg)
like image 21
user4815162342 Avatar answered Jan 30 '23 21:01

user4815162342


this is what I did eventually, that way its not blocking the main thread and all subscriptions are working in parallel.

def subscribe(self, payload):
    ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
    ws.connect(url)
    ws.send(payload)
    while True:
        result = ws.recv()
        print("Received '%s'" % result)

    def start_thread(self, loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def start_socket(self, **kwargs):
    worker_loop = asyncio.new_event_loop()
    worker = Thread(target=self.start_thread, args=(worker_loop,))
    worker.start()

    worker_loop.call_soon_threadsafe(self.subscribe, payload)


def listen(self):
    self.start_socket(payload1)
    # code
    self.start_socket(payload2)
    # code
    self.start_socket(payload3)
like image 189
joseRo Avatar answered Jan 30 '23 20:01

joseRo