Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use of pyzmq's logging handler in python

I want to introduce a zmq based logging into a Python program. As I was facing ZMQError: Address in use errors I decided to boil it down to a simple proof of concept. I was able to run the boiled down version, but not to receive any log entries. This is the code I used:

Log Publisher:

import time
import logging
from zmq.log import handlers as zmqHandler


logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
    logger.error('error test...')
    print "Send error #%s" % (str(i))
    time.sleep(1)

Result

Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...

Log Subscriber:

import time
import zmq

def sub_client():
    port = "12344"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://127.0.0.1:%s" % port)
    # Generate 30 entries
    for i in range (30):
        print "Listening to publishers..."
        message = socket.recv()
        print "Received error #%s: %s" % (str(i), message)
        time.sleep(1)

sub_client()

Result

Listening to publishers...

So the subscriber is locked at the call of socket.recv(). I started publisher and subscriber in different consoles. Both processes appear when I use netstat:

C:\>netstat -a -n -o | findstr 12344
  TCP    127.0.0.1:12344        0.0.0.0:0              LISTEN          1336
  TCP    127.0.0.1:12344        127.0.0.1:51937        ESTABLISHED     1336
  TCP    127.0.0.1:51937        127.0.0.1:12344        ESTABLISHED     8624

I fail to see my mistake here, any ideas?

In addition to the problem at hand, how do I use this zmq listener in general. Do I have to create one instance of the PUBHandler per process and then add it to all instances of logger (logging.getLogger('myapp') creates a own logger instance, right?) or do I have to create an own PUBHandler for all of the different classes I use? As the PUBHandlerclass has a createLock() I assume that it is not thread save...

For completness I want to mention the doc of the PUBHandler class

I am using a python(x,y) distribution at Win7 with python 2.7.10 and pyzmq 14.7.0-14

[update] I ruled out the the windows firewall as the source of the missing packages

like image 773
user_na Avatar asked Nov 17 '25 11:11

user_na


2 Answers

The problem is on the subscriber side. Initially a subscriber filters out all messages, until a filter is set. Use socket.setsockopt(opt, value) function to archive this. The pyZMQ description is not very clear about the use of this function:

getsockopt(opt) get default socket options for new sockets created by this Context

But the documentation of the zmq_setsockopt function is quite clear (see here):

int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)

...

ZMQ_SUBSCRIBE: Establish message filter The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

So the solution is to set a filter with socket.setsockopt(zmq.SUBSCRIBE,filter), where filter is the string you want to filter for. Use filter='' to show all messages. A filter like filter='ERROR' will only display the error messages and suppress all other types like WARNING,INFO or DEBUG.

With this the sub_client() function looks like this:

import time
import zmq

def sub_client():
    port = "12344"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.SUBSCRIBE,'')


    # Process 30 updates
    print "Listening to publishers..."
    for i in range (30):       
        print "Listening to publishers..."
        message = socket.recv()
        print "Received error #%s: %s" % (str(i), message)
        time.sleep(1)

sub_client()
like image 160
user_na Avatar answered Nov 19 '25 00:11

user_na


I know it is an older post, but if someone lands here this is what the subscriber looks like

def sub_client():
    port = "12345"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://localhost:%s" % port)
    socket.subscribe("")
    
    # Process 30 updates
    for i in range (30):
        print("Listening to publishers...")
        message = socket.recv()
        print("Received error #%s: %s",str(i), message)
        time.sleep(1)

    sub_client()

And Publisher looks like

import zmq
import logging
import time
from zmq.log.handlers import PUBHandler

port = "12345"       
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:%s" % port)

handler = PUBHandler(pub)
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
for i in range(50):
    logger.error('error test...')
    print("publish error",str(i))
    time.sleep(1)
like image 20
user8532998 Avatar answered Nov 19 '25 02:11

user8532998



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!