I want to introduce a zmq based logging into a Python program. As I was facing ZMQError: Address in use errors I decided to boil it down to a simple proof of concept. I was able to run the boiled down version, but not to receive any log entries. This is the code I used:
Log Publisher:
import time
import logging
from zmq.log import handlers as zmqHandler
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
logger.error('error test...')
print "Send error #%s" % (str(i))
time.sleep(1)
Result
Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...
Log Subscriber:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
# Generate 30 entries
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
Result
Listening to publishers...
So the subscriber is locked at the call of socket.recv(). I started publisher and subscriber in different consoles. Both processes appear when I use netstat:
C:\>netstat -a -n -o | findstr 12344
TCP 127.0.0.1:12344 0.0.0.0:0 LISTEN 1336
TCP 127.0.0.1:12344 127.0.0.1:51937 ESTABLISHED 1336
TCP 127.0.0.1:51937 127.0.0.1:12344 ESTABLISHED 8624
I fail to see my mistake here, any ideas?
In addition to the problem at hand, how do I use this zmq listener in general.
Do I have to create one instance of the PUBHandler per process and then add it to all instances of logger (logging.getLogger('myapp') creates a own logger instance, right?) or do I have to create an own PUBHandler for all of the different classes I use? As the PUBHandlerclass has a createLock() I assume that it is not thread save...
For completness I want to mention the doc of the PUBHandler class
I am using a python(x,y) distribution at Win7 with python 2.7.10 and pyzmq 14.7.0-14
[update] I ruled out the the windows firewall as the source of the missing packages
The problem is on the subscriber side. Initially a subscriber filters out all messages, until a filter is set. Use socket.setsockopt(opt, value) function to archive this.
The pyZMQ description is not very clear about the use of this function:
getsockopt(opt) get default socket options for new sockets created by this Context
But the documentation of the zmq_setsockopt function is quite clear (see here):
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)
...
ZMQ_SUBSCRIBE: Establish message filter TheZMQ_SUBSCRIBEoption shall establish a new message filter on aZMQ_SUBsocket. Newly createdZMQ_SUBsockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.
So the solution is to set a filter with socket.setsockopt(zmq.SUBSCRIBE,filter), where filter is the string you want to filter for. Use filter='' to show all messages. A filter like filter='ERROR' will only display the error messages and suppress all other types like WARNING,INFO or DEBUG.
With this the sub_client() function looks like this:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,'')
# Process 30 updates
print "Listening to publishers..."
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
I know it is an older post, but if someone lands here this is what the subscriber looks like
def sub_client():
port = "12345"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.subscribe("")
# Process 30 updates
for i in range (30):
print("Listening to publishers...")
message = socket.recv()
print("Received error #%s: %s",str(i), message)
time.sleep(1)
sub_client()
And Publisher looks like
import zmq
import logging
import time
from zmq.log.handlers import PUBHandler
port = "12345"
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:%s" % port)
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
for i in range(50):
logger.error('error test...')
print("publish error",str(i))
time.sleep(1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With