I'm using pyzmq
library with pub/sub pattern. I have some fast ZMQ publishers using .connect()
method and a slower ZMQ subscriber using .bind()
method.
Then after a few minutes, my subscriber receives the old data published from the publisher — due to ZMQ buffer.
Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)
[NOTE]:
high watermark
options, but it didn't work:socket.setsockopt(zmq.RCVHWM, 10) # not working socket.setsockopt(zmq.SNDHWM, 10) # not working
Publisher:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10) # not working
while True:
data = time.time()
print("%d" % data)
socket.send("%d" % data)
time.sleep(1)
Subscriber:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10) # not working
while 1:
time.sleep(2) # A speed reducer like.
data = socket.recv()
print(data)
The queue size is still more than 10 despite these options (via configured send/receive high watermark
).
ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is a high-performance asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker.
ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
What is ZeroMQ? ZeroMQ is a high-performance asynchronous messaging library.
Contexts help manage any sockets that are created as well as the number of threads ZeroMQ uses behind the scenes. Create one when you initialize a process and destroy it as the process is terminated. Contexts can be shared between threads and, in fact, are the only ZeroMQ objects that can safely do this.
I found an approach to get the last message only in ZMQ
subscriber using CONFLATE
option.
Note that you should set the CONFLATE
option before you connect:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while 1:
time.sleep(2) # Dummy delay
data = socket.recv()
print(data)
In other words, it removes any buffered queue on the subscriber side.
[NOTE]:
In addition, by using zmq.SNDBUF
and zmq.RCVBUF
options we can set a limit on ZMQ buffer size. (More information)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With