when I read the "Durable Subscribers and High-Water Marks" in zmq guide, it said "The HWM causes ØMQ to drop messages it can't put onto the queue", but no messages lost when I ran the example. Hit ctrl+c to terminate the durasub.py and then continue it.
durasub.py
import zmq
import time
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IDENTITY, "Hello")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect("tcp://localhost:5565")
sync = context.socket(zmq.PUSH)
sync.connect("tcp://localhost:5564")
sync.send("")
while True:
data = subscriber.recv()
print data
if data == "END":
break
durapub.py
import zmq
import time
context = zmq.Context()
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")
publisher.setsockopt(zmq.HWM, 2)
sync_request = sync.recv()
for n in xrange(10):
msg = "Update %d" % n
publisher.send(msg)
time.sleep(1)
publisher.send("END")
ZMQ uses the concept of HWM (high-water mark) to define the capacity of it's internal pipes. Each connection out of a socket or into a socket has its own pipe, and HWM for sending, and/or receiving, depending on the socket type. Some sockets ( PUB , PUSH ) only have send buffers.
It starts by immediately sending messages to each of a thousand topics, and then it sends one update a second to a random topic. A subscriber connects, and subscribes to a topic. Without LVC, a subscriber would have to wait an average of 500 seconds to get any data.
Who uses ZeroMQ? 54 companies reportedly use ZeroMQ in their tech stacks, including Alibaba Travels, energy2market, and XING.
ZeroMQ guarantees to deliver all the parts (one or more) for a message, or none of them. This allows you to send or receive a list of frames as a single on-the-wire message. A message (single or multipart) must fit in memory.
The suggestion above is valid, but doesn't properly address the problem in this particular code.
The real problem here is that in durapub.py
you call publisher.setsockopt(zmq.HWM, 2)
AFTER calling publisher.bind
. You should call setsockopt
BEFORE bind
or connect
.
Please refer to 0MQ API documentation for setsockopt:
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE and ZMQ_LINGER, only take effect for subsequent socket bind/connects.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With