I have found a similar question, ZeroMQ: HWM on PUSH does not work, but it couldn't solve my problem.
I want to control the number of messages that the push socket queues, but it doesn't work and still queues 1000 messages.
So I want to know how to set the hwm of the push socket. Thanks in advance.
My environment is: libzmq 4.0.4, pyzmq 14.1.0, python 3.3
Here's my code:
server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import random
import zmq
class TestPush(object):
def __init__(self):
self.ctx = zmq.Context()
random.seed()
def run(self):
task_snd = self.ctx.socket(zmq.PUSH)
task_snd.setsockopt(zmq.SNDHWM, 10)
task_snd.bind('tcp://*:53000')
while True:
workload = str(random.randint(1, 100))
task_snd.send(workload.encode('utf-8'))
print('Send {0}'.format(workload))
if __name__ == '__main__':
test_push = TestPush()
test_push.run()
client.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import random
import zmq
class TestPull(object):
def __init__(self):
self.ctx = zmq.Context()
def run(self):
task_rcv = self.ctx.socket(zmq.PULL)
task_rcv.setsockopt(zmq.RCVHWM, 1)
task_rcv.connect('tcp://localhost:53000')
while True:
msg = task_rcv.recv()
print('Receive msg: {0}'.format(msg))
time.sleep(random.randint(2, 3))
if __name__ == '__main__':
test_pull = TestPull()
test_pull.run()
I faced similar problem with ZeroMQ when I tried to set HWM (High Water Mark) on push and pull socket. Even, it was not working with pub and sub socket. I would like to explain what happened and how it got resolved.
I made 2 scripts, first as a sender with push socket and another as a receiver with pull socket. Set HWM as 10 on both the sockets. And inside receiver script, I put a delay of 5 seconds after each message received. Then I ran the sender script with loop of 100 messages (after keeping the receiver as running to receive).
The receiver queue and then the sender's queue will reach the hwm. After that, the sender will stop sending more messages.
The sender sends all the 100 messages and exited. But the receiver has kept processing message one-after for long time till it receives all the messages.
There is something called as Kernel Socket Buffer which seats between both the sender socket and the receiver sockets. Whenever, a process opens a socket, the kernel allots memory space to the tcp sockets, which is by default 128KB. The kernel socket buffer is applicable to the sender and receiver socket (so total buffer will be 128KB + 128KB). My message size was in bytes (an int with some characters). Thus, the total message buffering will be as follows:
Total buffer message = sender socket hwm + kernel socket buffer for the sender socket (128KB) + receiver socket hwm + kernel socket buffer for the sender socket (128KB)
Now, I changed my message length to little more than a 1KB. Then perform the test again and found approx 260 messages sent (as expected), after this the sender stops in interval till the receiver receives some message and starts again.
In order for push socket to keep sending messages even when receiver is not able receive, we can use NOBLOCK option in send routine, but then the number of lost messages by receiver will increase very high. So, better option is to use Poll or timeout and then call send routine with NOBLOCK option.
Please note that you may use SNDBUFF/RCVBUFF of zeromq but OS may not obey it (as in my case it was not working).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With