I'm having some trouble understanding how the ZeroMQ high-water mark (HWM) queues work.
I have made two scripts attached below, which reproduce the following.
The result I get is that the puller is able to receive (print) all messages successfully. Also, the pusher seems to finish execution almost instantly. According to the ZMQ official documentation what I would expect from that is the pusher to not finish execution before the puller awakes, because of being blocked on the second send(...)
call due to the HWM being reached. I have also tried adding a 0.001 second sleep between each send(...)
call, same result.
So, my questions are:
send(...)
, after the HWM is reached (size 1)? Scripts:
import zmq
context = zmq.Context()
push_socket = context.socket(zmq.PUSH)
push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)
push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')
for i in range(2200):
push_socket.send(str(i).encode('ascii'))
print('Finished execution...')
import zmq
import time
context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)
pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1
print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')
rec = ''
for i in range(2200):
rec += '{} '.format(pull_socket.recv().decode('ascii'))
print(rec) # Prints `0 1 2 ... 2198 2199 `
In order to reproduce my test case, open two terminals and launch first puller.py in one and quickly afterwards (4 seconds window) pusher.py in the other one.
There are at least 4 buffers involved here: zmq send buffer, OS write tcp buffer, OS read tcp buffer, and zmq recv buffer.
The zmq io threads mark a message as "sent" when it has successfully been written to the OS tcp write buffer. The messages are now considered "in transit".
Then the network stack takes care of transferring as much as it can into the other process' matching OS recv buffer, Finally, the receiving zmq io thread reads at most HWM messages at a time from this buffer into the ZMQ recv queue.
The OS buffers are by default usually around 10-100kb, and both of these can fill up completely with "in transit" messages before ZMQ even notices that the other side isn't consuming any messages. These buffers are kind of required for performance reasons - you can't just get rid of them.
The solution to your problem probably involves req/rep sockets and an explicit application-level ack i.e. lazy pirate pattern from the guide.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With