So I was reading this article on how to create proxy/broker for (X
)PUB
/(X
)SUB
messaging in ZMQ. There is this nice picture of what shall architecture look like :
But when I look at XSUB
socket description I do not get how to forward all subscriptions via it due to the fact that its Outgoing routing strategy
is N/A
So how one shall implement (un)subscription forwarding in ZeroMQ, what is minimal user code for such forwarding application (one that can be inserted between simple Publisher and Subscriber samples)?
XPUB does receive messages - the only messages it receives are subscriptions from connected subscribers, and these messages should be forwarded upstream as-is via XSUB.
The very simplest way to relay messages is with zmq_proxy:
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
which will relay messages to/from xpub and xsub. Optionally, you can add a PUB socket to monitor the traffic that passes through in either direction.
If you want user code in the middle to implement extra routing logic, you would do something like this,
which re-implements the inner loop of zmq_proxy
:
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub, zmq.POLLIN)
poller.register(xsub, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[BROKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here
full working (Python) example
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With