Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python ZeroMQ PUSH/PULL -- Lost Messages?

Tags:

python

zeromq

I am trying to use python with zeroMQ in PUSH / PULL mode, sending messages of size 4[MB] every few seconds.

For some reason, while it looks like all the messages are sent, ONLY SOME of them appear to have been received by the server. What am I missing here?

Here's the code for the client -- client.py

import zmq
import struct

# define a string of size 4[MB] 
msgToSend = struct.pack('i', 45) * 1000 * 1000 

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5000")

# print the message size in bytes
print len(msgToSend)

socket.send(msgToSend)

print "Sent message"

And here is the code for the server -- server.py

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://127.0.0.1:5000")

while True:
    # receive the message
    msg = socket.recv()

    print "Message Size is: {0} [MB]".format( len(msg) / (1000 * 1000) )

What am I missing? How do I guarantee that messages are always sent and not lost?

In case it matters, I am using Ubuntu 10.04 32bit, Core Duo machine with 2[GB] RAM.

NOTE: I tried the same example using RabbitMQ and everything works well -- no message is lost. I am perplexed as I often hear praises of zeroMQ. Why did it fail where RabbitMQ succeed?

like image 282
user3262424 Avatar asked Jul 15 '11 02:07

user3262424


2 Answers

The problem is that when the program exits, the socket gets closed immediately and garbage collected with an effective LINGER of 0 (i.e. it throws any unsent messages away). This is a problem for larger messages because they take longer to send than it takes for the socket to be garbage collected.

You can avoid this by putting a sleep(0.1) just before the program exits (to delay the socket and context being garbage collected).

socket.setsockopt(zmq.LINGER, -1) (which is the default) should avoid this problem, but it doesn't for some reason that I haven't had time to investigate.

like image 59
cwb Avatar answered Sep 22 '22 08:09

cwb


It's conceivable that you are running out of memory (depending how you are sending messages, whether they are being consumed fast enough, etc). You can use socket.setsockopt(zmq.HWM) to set HWM to a sane value and prevent zeromq from storing too many messages in the outgoing buffer. With this in mind, consider slightly modified examples:

# server
...
counter = 0
while True:
    ...receive the message
    counter += 1
    print "Total messages recieved: {0}".format(counter)

# client
socket.setsockopt(zmq.HWM, 8)
for i in range(1000):
    socket.send(msgToSend)

And then running 10 test clients:

for i in {1..10}; do
    python client.py &
done

From the server you can see all the messages are recieved:

Total messages recieved: 9998
Total messages recieved: 9999
Total messages recieved: 10000
like image 27
zeekay Avatar answered Sep 19 '22 08:09

zeekay