Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manage websockets across multiple servers / workers

aiohttp has built-in support for websockets. It's very simple and works well.

A simplified version of the example in the docs is:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # Async iterate the messages the client sends
    async for message in ws:
        ws.send_str('You sent: %s' % (message.data,))

    print('websocket connection closed')

In the example, ws is a reference to a websocket connection with a client. I could easily put this references into request.app, like @Crandel does here (i.e., global state), but not in a production app, because each app server (and even each worker) will have its own app instance.

Is there an accepted pattern for this? Is there another way?

Note: I'm not referring to sessions. I'm referring to connections. I want to send a message to clients that connected to server A when events occur in application code in server B, etc.

like image 714
orokusaki Avatar asked Mar 05 '16 22:03

orokusaki


People also ask

Can WebSocket handle multiple clients?

A server can open WebSocket connections with multiple clients—even multiple connections with the same client. It can then message one, some, or all of these clients. Practically, this means multiple people can connect to our chat app, and we can message some of them at a time.

How do I manage my WebSocket connections?

Creating a WebSocket object In order to communicate using the WebSocket protocol, you need to create a WebSocket object; this will automatically attempt to open the connection to the server. The URL to which to connect; this should be the URL to which the WebSocket server will respond.

Would WebSockets be able to handle 1000000 concurrent connections?

With at least 30 GiB RAM you can handle 1 million concurrent sockets.

How many connections can WebSockets handle?

The theoretical limit is 65k connections per IP address but the actual limit is often more like 20k, so we use multiple addresses to connect 20k to each (50 * 20k = 1 mil).


1 Answers

If I'm understanding you correctly, you want to have multiple websocket servers, each with multiple clients connected, but you want to be able to communicate potentially with all of the connected clients.

Here is an example that creates three trivial servers -- a capitalization echo, a random quote, and time of day -- and then sends a broadcast message to all of the connected clients. Maybe this has some useful ideas in it.

Pastebin: https://pastebin.com/xDSACmdV

#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.

In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers

Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser

import aiohttp
from aiohttp import web

__author__ = "Robert Harder"
__email__ = "[email protected]"
__license__ = "Public Domain"


def main():
    # Create servers
    cap_srv = CapitalizeEchoServer(port=9990)
    rnd_srv = RandomQuoteServer(port=9991)
    tim_srv = TimeOfDayServer(port=9992)

    # Queue their start operation
    loop = asyncio.get_event_loop()
    loop.create_task(cap_srv.start())
    loop.create_task(rnd_srv.start())
    loop.create_task(tim_srv.start())

    # Open web pages to test them
    webtests = [9990, 9991, 9991, 9992, 9992]
    for port in webtests:
        url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
        webbrowser.open(url)
    print("Be sure to click 'Connect' on the webpages that just opened.")

    # Queue a simulated broadcast-to-all message
    def _alert_all(msg):
        print("Sending alert:", msg)
        msg_dict = {"alert": msg}
        cap_srv.broadcast_message(msg_dict)
        rnd_srv.broadcast_message(msg_dict)
        tim_srv.broadcast_message(msg_dict)

    loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")

    # Run event loop
    loop.run_forever()


class MyServer:
    def __init__(self, port):
        self.port = port  # type: int
        self.loop = None  # type: asyncio.AbstractEventLoop
        self.app = None  # type: web.Application
        self.srv = None  # type: asyncio.base_events.Server

    async def start(self):
        self.loop = asyncio.get_event_loop()
        self.app = web.Application()
        self.app["websockets"] = []  # type: [web.WebSocketResponse]
        self.app.router.add_get("/", self._websocket_handler)
        await self.app.startup()
        handler = self.app.make_handler()
        self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
        print("{} listening on port {}".format(self.__class__.__name__, self.port))

    async def close(self):
        assert self.loop is asyncio.get_event_loop()
        self.srv.close()
        await self.srv.wait_closed()

        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')

        await self.app.shutdown()
        await self.app.cleanup()

    async def _websocket_handler(self, request):
        assert self.loop is asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        self.app["websockets"].append(ws)

        await self.do_websocket(ws)

        self.app["websockets"].remove(ws)
        return ws

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            pass

    def broadcast_message(self, msg: dict):
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            ws.send_json(msg)


class CapitalizeEchoServer(MyServer):
    """ Echoes back to client whatever they sent, but capitalized. """

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            cap = ws_msg.data.upper()
            ws.send_str(cap)


class RandomQuoteServer(MyServer):
    """ Sends a random quote to the client every so many seconds. """
    QUOTES = ["Wherever you go, there you are.",
              "80% of all statistics are made up.",
              "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]

    def __init__(self, interval: float = 10, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self.interval = interval

    async def do_websocket(self, ws: web.WebSocketResponse):
        async def _regular_interval():
            while self.srv.sockets is not None:
                quote = random.choice(RandomQuoteServer.QUOTES)
                ws.send_json({"quote": quote})
                await asyncio.sleep(self.interval)

        self.loop.create_task(_regular_interval())

        await super().do_websocket(ws)  # leave client connected here indefinitely


class TimeOfDayServer(MyServer):
    """ Sends a message to all clients simultaneously about time of day. """

    async def start(self):
        await super().start()

        async def _regular_interval():
            while self.srv.sockets is not None:
                if int(time.time()) % 10 == 0:  # Only on the 10 second mark
                    timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
                    self.broadcast_message({"timestamp": timestamp})
                await asyncio.sleep(1)

        self.loop.create_task(_regular_interval())


if __name__ == "__main__":
    main()
like image 84
rharder Avatar answered Sep 28 '22 11:09

rharder