Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ZMQ socket gracefully termination in Python

I have the following ZMQ script

#!/usr/bin/env python2.6


import signal
import sys
import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)

def signal_term_handler(signal, fname):
    socket.close()
    sys.exit(0)

def main():
    signal.signal(signal.SIGTERM, signal_term_handler)

    socket.connect('tcp://16.160.163.27:8888')
    socket.setsockopt(zmq.SUBSCRIBE, '')
    print 'Waiting for a message'

    while True:
        (event, params) = socket.recv().split()
        # ... doing something with that data ...

if __name__ == '__main__':
    main()

When I Ctrl-C, I get the following errors:

Traceback (most recent call last):
  File "./nag.py", line 28, in <module>
    main()
  File "./nag.py", line 24, in main
    (event, params) = socket.recv().split()
  File "socket.pyx", line 628, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5616)
  File "socket.pyx", line 662, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5436)
  File "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)
  File "checkrc.pxd", line 11, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:5863)
KeyboardInterrupt

Now, I thought I handled the closing of the socket, when receiving a termination signal from the user, pretty well, then why do I get this ugly messages. What am I missing.

Note I have done some search on Google and StackOverflow but haven't found anything that fixes this problem.

Thanks.

EDIT To anyone that has gotten this far -- user3666197 has suggested a very-good-and-robust way to handle termination or any exception during the execution.

like image 264
boaz_shuster Avatar asked Oct 06 '14 20:10

boaz_shuster


People also ask

What is ZMQ socket?

ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast.

What is ZMQ Python?

ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is a high-performance asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker.

Is ZMQ asynchronous?

ZeroMQ is an asynchronous network messaging library known for its high performance.

How does ZMQ Poller work?

The poller notifies when there's data (messages) available on the sockets; it's your job to read it. When reading, do it without blocking: socket. recv( ZMQ. DONTWAIT) .


3 Answers

Event handling approach

While the demo-code is small, the real-world systems, the more the multi-host / multi-process communicating systems, shall typically handle all adversely impacting events in their main control-loop.

try:
    context = zmq.Context()         # setup central Context instance
    socket  = ...                   # instantiate/configure all messaging archetypes
    # main control-loop ----------- # ----------------------------------------
    #
    # your app goes here, incl. all nested event-handling & failure-resilience
    # ----------------------------- # ----------------------------------------
except ...:
    #                               # handle IOErrors, context-raised exceptions
except Keyboard Interrupt:
    #                               # handle UI-SIG
except:
    #                               # handle other, exceptions "un-handled" above
finally:
    #                               # GRACEFULL TERMINATION
    # .setsockopt( zmq.LINGER, 0 )  #           to avoid hanging infinitely
    # .close()                      # .close()  for all sockets & devices
    #
    context.term()                  #           Terminate Context before exit
like image 170
user3666197 Avatar answered Sep 18 '22 20:09

user3666197


Use SIGINT instead of SIGTERM that should fix it.

http://www.quora.com/Linux/What-is-the-difference-between-the-SIGINT-and-SIGTERM-signals-in-Linux

like image 33
Johan Snowgoose Avatar answered Sep 16 '22 20:09

Johan Snowgoose


Cleaning at exist

One may think of the code bellow! But it's not needed! For the socket closing!

The sockets get closed automatically!

However that's the way to do it manually!

Also i'm listing all the different useful information to understand the implication around the subject of destroying and closing or cleaning!

try:
   context = zmq.Context()
   socket = context.socket(zmq.ROUTER)
   socket.bind(SOCKET_PATH)
   # ....
finally :
    context.destroy() # Or term() for graceful destroy 

Error at KeyboardInterupt and fix

Before going further! Why the error:

Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
...
    msg = self.recv(flags)
  File "zmq/backend/cython/socket.pyx", line 781, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 817, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 13, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt

It's simply the KeyboardInterrupt error!

Just catching it! Will solve the problem!

For example:

try:
    context = zmq.Context()
    socket = context.socket(zmq.ROUTER)
    socket.bind(SOCKET_PATH)
    # ...
except KeyboardInterrupt:
    print('> User forced exit!')

Bingo the error no more show!

enter image description here

Now no need to terminate the context! It will be done automatically!

Note too: If you don't catch KeyboardInterrupt! And simply make a finally: block and run context.term() alone! The process will hang for ever!

finally:
    socket.close() # assuming one socket within the context
    context.term()

or

finally:
    context.destroy()

Will throw the same error! Which prove the error is the raise up of the keyboard interupt! Which should have been catched from within the library! And thrown again!

Only catching KeyboardInterrupt will do!

except KeyboardInterrupt:
    print('> User forced exit!')
finally:
    context.destroy() # manual (not needed)

Will do! But completly useless to add the finally block! And manually destroy (close socket + terminate)

Let me tell you why!

If in a hurry go to In python no need to clean at exit section all at the end!

How termination work and why

From the zguide: Making-a-Clean-Exit

It states that we need to close all messages! And also all sockets! Only until this, that the termination unblock and make the code exit

And c lang! The api go through zmq_ctx_destroy() and also closing the sockets and destroying the messages!

There is a lot of things to know:

Memory leaks are one thing, but ZeroMQ is quite finicky about how you exit an application. The reasons are technical and painful, but the upshot is that if you leave any sockets open, the zmq_ctx_destroy() function will hang forever. And even if you close all sockets, zmq_ctx_destroy() will by default wait forever if there are pending connects or sends unless you set the LINGER to zero on those sockets before closing them.

The ZeroMQ objects we need to worry about are messages, sockets, and contexts. Luckily it’s quite simple, at least in simple programs:

  • Use zmq_send() and zmq_recv() when you can, as it avoids the need to work with zmq_msg_t objects.
  • If you do use zmq_msg_recv(), always release the received message as soon as you’re done with it, by calling zmq_msg_close().
  • If you are opening and closing a lot of sockets, that’s probably a sign that you need to redesign your application. In some cases socket handles won’t be freed until you destroy the context.
  • When you exit the program, close your sockets and then call zmq_ctx_destroy(). This destroys the context.

Python api for destroying the context and termination

In pyzmq! The Context.term() make the call to zmq_ctx_destroy()!

The method Context.destroy() on the other hand is not only zmq_ctx_destroy() but it go and close all the sockets of the context! Then call Context.term() which call zmq_ctx_destroy()!

From the python doc

destroy()

note destroy() is not zmq_ctx_destroy()! term() is!

destroy() = context socket close() + term()

destroy(linger=None)

Close all sockets associated with this context and then terminate the context.

Warning destroy involves calling zmq_close(), which is NOT threadsafe. If there are active sockets in other threads, this must not be called.

Parameters linger (int, optional) – If specified, set LINGER on sockets prior to closing them.

term()

term()

Close or terminate the context.

Context termination is performed in the following steps:

  • Any blocking operations currently in progress on sockets open within context shall raise zmq.ContextTerminated. With the exception of socket.close(), any further operations on sockets open within this context shall raise zmq.ContextTerminated.
  • After interrupting all blocking calls, term shall block until the following conditions are satisfied: All sockets open within context have been closed.
  • For each socket within context, all messages sent on the socket have either been physically transferred to a network peer, or the socket’s linger period set with the zmq.LINGER socket option has expired.
  • For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER. This can be called to close the context by hand. If this is not called, the context will automatically be closed when it is garbage collected.

This is useful if you want to manually close!

It depends on the wanted behavior one may want to go with a way or another! term() will raise the zmq.ContextTerminated exception for open sockets operation! If forcing out! One can simply call destroy()! For graceful exit! One can use term()! Then in the catched zmq.ContextTerminated exceptoin block! One should close the sockets! And do any handling! For closing the socket one can use socket.close()! Doing it socket by socket! I wonder what happen if we call destroy() at this point! It may works! The socket will get closed! But then a second call for context.term() will go! it may be ok! It may not! Didn't try it!

LINGER

Check ZMQ_LINGER: Set linger period for socket shutdown title! (ctrl + f)

http://api.zeromq.org/2-1:zmq-setsockopt

The ZMQ_LINGER option shall set the linger period for the specified socket. The linger period determines how long pending messages which have yet to be sent to a peer shall linger in memory after a socket is closed with zmq_close(3), and further affects the termination of the socket's context with zmq_term(3). The following outlines the different behaviours:

  • The default value of -1 specifies an infinite linger period. Pending messages shall not be discarded after a call to zmq_close(); attempting to terminate the socket's context with zmq_term() shall block until all pending messages have been sent to a peer.
  • The value of 0 specifies no linger period. Pending messages shall be discarded immediately when the socket is closed with zmq_close().
  • Positive values specify an upper bound for the linger period in milliseconds. Pending messages shall not be discarded after a call to zmq_close(); attempting to terminate the socket's context with zmq_term() shall block until either all pending messages have been sent to a peer, or the linger period expires, after which any pending messages shall be discarded.

Option value type: int
Option value unit: milliseconds
Default value: -1 (infinite)
Applicable socket types: all

In python no need to clean at exit

You only use destroy() or a combination of term() and destroy() if you want to manually destroy a context! And that's if you want to do some handling given the zmq.ContextTerminated exception! Or while working with multiple contexts! And you are creating them and closing them! Even though generally we never do that! Or some reasons while the code is all right running!

Otherwise as stated in the zguide

This is at least the case for C development. In a language with automatic object destruction, sockets and contexts will be destroyed as you leave the scope. If you use exceptions you’ll have to do the clean-up in something like a “final” block, the same as for any resource.

And you can see it in the pyzmq doc at Context.term() above:

This can be called to close the context by hand. If this is not called, the context will automatically be closed when it is garbage collected.

When the variable run out of scope they get destroyed! And the destroy and exit will be handled automatically! When the program exit ! Let's say even after a finally code! All variables will get destroyed! And so the cleaning will happen there!

Again! If you are having some problems! Make sure it's not contexts, socket and messages closing related! And make sure to use the latest version of pyzmq

like image 28
Mohamed Allal Avatar answered Sep 17 '22 20:09

Mohamed Allal