Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

trying to understand zeromq high water mark behaviour

I have been playing around with pyzmq and simple load balancing using HWM, and I don't quite understand the behaviour I am seeing.

I have set up a simple multithreading test, with a DEALER client connected to two workers via a ROUTER to DEALER pattern. HWM is set to 1. One of the worker is very fast and the other is very slow, and all the client does is spam 100 messages to the server. This generally seems to work and the faster worker processes many more messages than the slow worker.

However, even if I set the slow worker to be so slow, such that the fast worker should be able to process 99 messages before the slow worker finished even one, the slow worker still seems to receive at least 2 or 3 messages.

Are high water mark behaviour inexact or am I missing something?

The server code is as follows:

import re, sys, time, string, zmq, threading, signal


def worker_routine(worker_url, worker_id, context=None):
    # socket to talk to dispatcher
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REP)
    socket.set_hwm(1)
    socket.connect(worker_url)

    print "worker ", worker_id, " ready ..."
    while True:
        x = socket.recv()

        if worker_id==1:
            time.sleep(3)

        print worker_id, x
        sys.stdout.flush()

        socket.send(b'world')


context = zmq.Context().instance()
# socket facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# socket facing services
backend  = context.socket(zmq.DEALER)
url_worker = "inproc://workers"
backend.set_hwm(1)
backend.bind(url_worker)

# launch pool of worker threads
for i in range(2):
    thread = threading.Thread(target=worker_routine, args=(url_worker,i,))
    thread.start()
    time.sleep(0.1)

try:
    zmq.device(zmq.QUEUE, frontend, backend)
except:
    print "terminating!"

# we never get here
frontend.close()
backend.close()
context.term()

The client code is as follows:

import zmq, random, string, time, threading, signal

#  prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5559")

inputs = [''.join(random.choice(string.ascii_lowercase) for x in range(12)) for y in range(100)]

for x in xrange(100):
    socket.send_multipart([b'', str(x)])

print "finished!"

Example output:

...
0 81
0 82
0 83
0 84
0 85
0 86
0 87
0 88
0 89
0 90
0 91
0 92
0 93
0 94
0 95
0 96
0 97
0 98
0 99
1 1
1 3
1 5
like image 526
TheTaintedOne Avatar asked Oct 21 '22 16:10

TheTaintedOne


1 Answers

Apparently ZeroMQ send messages asynchronously from your send() call. That is to say, when send() returns, the message has not yet been sent, or added to the internal queue. If you send fast enough, the next time you call send, the message still hasn't been added to the queue, and thus the watermark has not been reached. You may add tens or hundreds of messages before some make it on the queue, the watermark is reached, and the blocking send behaviour kicks in.

In other words, try sleeping for a fraction of a second after send() and see what happens, it should give enough time for the message to be added to the queue, so by the time of the next send, it is able to see the watermark has been reached.

like image 137
aaa90210 Avatar answered Oct 23 '22 11:10

aaa90210