Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to have limited ZMQ (ZeroMQ - PyZMQ) queue buffer size in python?

I'm using pyzmq library with pub/sub pattern. I have some fast ZMQ publishers using .connect() method and a slower ZMQ subscriber using .bind() method. Then after a few minutes, my subscriber receives the old data published from the publisher — due to ZMQ buffer.


My Question:

Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)

[NOTE]:

  • I don't want to use ZMQ PUSH/PULL.
  • I've read this post, but this approach clear buffer only: clear ZMQ buffer
  • I also tried with high watermark options, but it didn't work:
socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working

Publisher:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)

Subscriber:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)  # A speed reducer like.
    data = socket.recv()
    print(data)

The queue size is still more than 10 despite these options (via configured send/receive high watermark).

like image 705
Benyamin Jafari Avatar asked Jan 16 '18 10:01

Benyamin Jafari


People also ask

What is Python ZMQ?

ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is a high-performance asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker.

Does ZeroMQ use sockets?

ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.

Is ZMQ asynchronous?

What is ZeroMQ? ZeroMQ is a high-performance asynchronous messaging library.

What is a ZMQ context?

Contexts help manage any sockets that are created as well as the number of threads ZeroMQ uses behind the scenes. Create one when you initialize a process and destroy it as the process is terminated. Contexts can be shared between threads and, in fact, are the only ZeroMQ objects that can safely do this.


1 Answers

I found an approach to get the last message only in ZMQ subscriber using CONFLATE option.

Note that you should set the CONFLATE option before you connect:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)  # Dummy delay
    data = socket.recv()
    print(data)

In other words, it removes any buffered queue on the subscriber side.


[NOTE]:

In addition, by using zmq.SNDBUF and zmq.RCVBUF options we can set a limit on ZMQ buffer size. (More information)


like image 107
Benyamin Jafari Avatar answered Nov 15 '22 10:11

Benyamin Jafari