I noticed that a zeromq PUB socket will buffers all outgoing data if it is connecting, for example
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print sub.recv()
The sub binds after those messages, they should be dropped, because PUB should drop messages if no one connected to it. But instead of dropping messages, it buffers all messages.
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
As you can see, those "a message should not be dropped" are buffered by the socket, once it gets connected, it flush them to SUB socket. If I bind at the PUB socket, and connect at the SUB socket, then it works correctly.
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print repr(sub.recv())
And you can only see the output
'hi'
This kind of strange behavior cause a problem, it buffers all data on a connecting socket, I have two servers, server A publishes data to server B
Server A -- publish --> Server B
It works fine if server B gets online. But what if I start the Server A and do not start Server B?
As the result, the connecting PUB socket on Server A keeps all those data, the memory usage gets higher and higher.
Here is the problem, is this kind of behavior a bug or feature? If it is feature, where can I find a document that mentions this behavior? And how can I stop the connecting PUB socket buffers all data?
Thanks.
ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
ZeroMQ is asynchronous brokerless signalling / messaging framework.
The ZeroMQ Message Transport Protocol (ZMTP) is a transport layer protocol for exchanging messages between two peers over a connected transport layer such as TCP. This document describes ZMTP/2.0. In theory, ZMTP should define fully interoperable behavior between implementations.
ZeroMQ is an asynchronous network messaging library known for its high performance.
Whether the socket blocks or drops messages depends on the socket type as described in the ZMQ::Socket documentation (emphasis below is mine):
ZMQ::HWM: Retrieve high water mark
The ZMQ::HWM option shall retrieve the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer that the specified socket is communicating with.
If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in ZMQ::Socket for details on the exact action taken for each socket type.
The default ZMQ::HWM value of zero means “no limit”.
You can see if it will block or drop by looking through the documentation for the socket type for ZMQ::HWM option action
which will either be Block
or Drop
.
The action for ZMQ::PUB
is Drop
, so if it is not dropping you should check the HWM (High Water Mark) value and heed the warning that The default ZMQ::HWM value of zero means “no limit”, meaning that it will not enter an exceptional state until the system runs out of memory (at which point I don't know how it behaves).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With