Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ZeroRPC publish subscribe

I would like to set up an event-based system between my servers. For example, when the server that wraps my database logic changes state, I would like for it to notify my other servers. A publish/subscribe design seems ideal for this, and I have heard good things about ZeroRPC.

Some people have mentioned using zerorpc streaming to accomplish pub/sub, however it's not obvious to me how firing events would work using streaming.

like image 899
user1161657 Avatar asked Dec 16 '22 20:12

user1161657


1 Answers

At dotCloud we are using a lots of pub/sub trough zerorpc streaming. Let me describe our way to do it.

In summary

We expose a streaming method decorated with @zerorpc.stream. This method, when called, will add a gevent.queue to a set. Then the method will loop forever, yielding every messages arriving on the queue. When this method terminate (because the client disconnected), the queue is removed from the set.

To publish, simply post the message to be published on every single queues registered in the set. At this time you have to decide what you want to do about slow consumers (disconnect them, queue for them up to a certain limit and/or discard new messages).

Implementation example with zerorpc-python:

The subscribing part

class MyService(object):
    def __init__(self):
        self._subscribers = set()

    @zerorpc.stream
    def subscribe(self):
        try:
            queue = gevent.queue.Queue()
            self._subscribers.add(queue)
            for msg in queue:
                yield msg
        finally:
            self._subscribers.remove(queue)

The subscribe method simply add event queue to a set. Then consume the queue forever until either: - the queue is ended by a StopIteration message (see gevent.queue.Queue documentation) - the greenlet running the subscribe function is killed (usually because the client disconnected)

In both cases, the finally statement is executed, and the queue is removed from the list of subscribers.

Note that it is possible to limit the size of the queue at this point: ...Queue(maxsize=42).

The publishing part

class MyService(object):
    [...]

    def _publish(self, msg):
        for queue in self._subscribers:
            if queue.size < 42:
                queue.put(msg)

Call this method to publish a message. It will iterate over all the subscribers queues to put the message in it. In my example, if the queue reach reach a specific size, I discard the message. But there is no limit to what kind of pattern you want to apply there.

You can store the greenlet instance of the subscriber in the set and then kill it when the queue is full, effectively disconnecting slow client (you can even try to send a message informing the client of being too slow). You could also wait for all your consumer to process the messages in parallel before returning from _publish, etc. The sky's the limit my friend!

Hope that helps!

like image 64
bombela Avatar answered Feb 27 '23 11:02

bombela