Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

zero-mq: socket.recv() call is blocking

Tags:

python

zeromq

I am trying to use zero-mq.My requirement is very simple.I want to be able to communicate between two peers in a network.I came across this program in the examples in the book.

$ pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

$sub_client.py

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print ('{} {}'.format(topic, messagedata))

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))

The problem I have with this model is that

string = socket.recv()

blocks till I recieve a message.I don't want this to happen.I want the messages to be queued up on the recieve side so that I can get it out of the queue (or something similar to this)

Is there some model in zero-mq that allows this?

like image 548
liv2hak Avatar asked Sep 24 '14 08:09

liv2hak


People also ask

What does ZeroMQ do when a socket has a limit?

If this limit has been reached the socket enters an exceptional state and depending on the socket type, ZeroMQ will take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions below for details on the exact action taken for each socket type.

What is a blocking call in sockets?

A blocking call does not return to your program until the event you requested has been completed. For example, if you issue a blocking recvfrom () call, the call does not return to your program until data is available from the other socket application.

Which IBM® TCP/IP services socket APIs support nonblocking socket calls?

All IBM® TCP/IP Services socket APIs support nonblocking socket calls. Some APIs, in addition to nonblocking calls, support asynchronous socket calls. The default mode of socket calls is blocking.

What is the default mode of socket calls?

If data is not available to the socket, and the socket is in blocking and synchronous modes, the READ call blocks the caller until data arrives. All IBM® TCP/IP Services socket APIs support nonblocking socket calls. Some APIs, in addition to nonblocking calls, support asynchronous socket calls. The default mode of socket calls is blocking.


1 Answers

zmq.Socket.recv will not block if you pass the zmq.NOBLOCK flag parameter.

The docs say:

If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.

zmq will queue messages that it receives and one message will be returned for each recv() call until this queue is exhausted after which ZMQError is raised.

zmq.Again used in the exmaples below is a wrapper for zmq.EAGAIN.

For example:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

    # perform other important stuff
    time.sleep(10)

The sub_client.py example could perhaps be written to use non-blocking behaviour like this:

import sys, time
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
    try:
        #process all messages waiting on subscribe socket
        while True:
            #check for a message, this will not block
            string = socket.recv(flags=zmq.NOBLOCK)

            #message received, process it
            topic, messagedata = string.split()
            total_value += int(messagedata)
            print ('{} {}'.format(topic, messagedata))

            #check if we have all the messages we want
            received_value_count += 1
            if received_value_count > 4:
                do_receive_loop = False
                break

    except zmq.Again as e:
        #No messages waiting to be processed
        pass

    #Here we can do other stuff while waiting for messages
    #contemplate answer to 'The Last Question'
    time.sleep(15)
    print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))
like image 136
Jeremy Allen Avatar answered Oct 07 '22 20:10

Jeremy Allen