Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get subscriber filter from a ZMQ PUB socket

I noticed in the FAQ, in the Monitoring section, that it's not possible to get a list of connected peers or to be notified when peers connect/disconnect.

Does this imply that it's also not possible to know which topics a PUB/XPUB socket knows it should publish, from its upstream feedback? Or is there some way to access that data?

I know that ZMQ >= 3.0 "supports PUB/SUB filtering at the publisher", but what I really want is to filter at my application code, using the knowledge ZMQ has about which topics are subscribed to.

My use-case is that I want to publish info about the status of a robot. Some topics involve major hardware actions, like switching the select lines on an ADC to read IR values.

I have a publisher thread running on the bot that should only do that "read" to get IR data when there are actually subscribers. However, since I can only feed a string into my pub_sock.send, I always have to do the costly operation, even if ZMQ is about to drop that message when there are no subscribers.

I have an implementation that uses a backchannel REQ/REP socket to send topic information, which my app can check in its publish loop, thereby only collecting data that needs to be collected. This seems very inelegant though, since ZMQ must already have the data I need, as evidenced by its filtering at the publisher.

I noticed that in this mailing list message, the OP seems to be able to see subscribe messages being sent to an XPUB socket.

However, there's no mention of how they did that, and I'm not seeing any such ability in the docs (still looking). Maybe they were just using Wireshark (to see upstream subscribe messages to an XPUB socket).

like image 233
dfarrell07 Avatar asked Feb 07 '14 19:02

dfarrell07


People also ask

What is Zmq pub sub?

Pub/Sub is a pattern where the publisher is not programmed to send a message (payload) to a specific receiver. These messages are sent by publishers to specific channels, and receivers can subscribe to one or more channels to consume those same messages.

How do I know if my Zmq socket is connected?

No, there's no method in the API to check if a socket is connected. ZeroMq abstracts the network; client and server connections are completely transparent to the peer making the connection.

What is Zmq socket?

ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast.

Does pub/sub use TCP?

To track subscriptions, Redis uses a global variable pubsub_channels which maps channel names to sets of subscribed client objects. A client object represents a TCP-connected client by tracking that connection's file descriptor.


2 Answers

Using zmq.XPUB socket type, there is a way to detect new and leaving subscribers. The following code sample shows how:

# Publisher side
import zmq

ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)

events = dict(poller.poll(1000))
if xpub_socket in events:
    msg = xpub_socket.recv()
    if msg[0] == b'\x01':
        topic = msg[1:]
        print "Topic '%s': new subscriber" % topic
    elif msg[0] == b'\x00':
        topic = msg[1:]
        print "Topic '%s': subscriber left" % topic

Note that the zmq.XSUB socket type does not subscribe in the same manner as the "normal" zmq.SUB. Code sample:

# Subscriber side
import zmq
ctx = zmq.Context.instance()

# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)

# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher

I'd also like to point out the zmq.XPUB_VERBOSE socket option. If set, all subscription events are received on the socket. If not set, duplicate subscriptions are filtered. See also the following post: ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

like image 115
Freek Wiekmeijer Avatar answered Sep 19 '22 20:09

Freek Wiekmeijer


At least for the XPUB/XSUB socket case you can save a subscription state by forwarding and handling the packages manually:

context = zmq.Context()

xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')

poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)

while True:
    try:
        events = dict(poller.poll(1000))
    except KeyboardInterrupt:
        break

    if xpub_socket in events:
        message = xpub_socket.recv_multipart()

        # HERE goes some subscription handle code which inspects
        # message

        xsub_socket.send_multipart(message)
    if xsub_socket in events:
        message = xsub_socket.recv_multipart()
        xpub_socket.send_multipart(message)

(this is Python code but I guess C/C++ looks quite similar)

I'm currently working on this topic and I will add more information as soon as possible.

like image 31
frans Avatar answered Sep 18 '22 20:09

frans