I am using pika.BlockingConnection in a consumer which performs some tasks for each message. I have also added signal handling so that the consumer dies properly after completely performing all tasks.
While message is being processed and signal is received, I just get "signal received" from the function, but the code does not exit. So, I decided to check for signal received at the end of callback function, too. The question is, how many times do I check for the signal, as there will be many more functions in this code. Is there a better way of handling signals without overdoing things?
import signal
import sys
import pika
from time import sleep
received_signal = False
all_over = False
def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()
def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
try:
    print ' [*] Waiting for messages. To exit press CTRL+C'
    mq_channel.basic_consume(callback, queue='test')
    mq_channel.start_consuming()
except Exception:
    mq_channel.close()
    exit()
This is my first question here, so let me know if any more details are required.
I think this does what you're looking for:
#!/usr/bin/python
import signal
import sys 
import pika
from contextlib import contextmanager
received_signal = False
processing_callback = False
def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
    if not processing_callback:
         sys.exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
@contextmanager
def block_signals():
    global processing_callback
    processing_callback = True
    try:
        yield
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()
def callback(ch, method, properties, body):
    with block_signals:
        print body
        sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't.
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
if __name__ == "__main__":    
    try:
        mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        mq_channel = mq_connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        mq_channel.basic_consume(callback, queue='test')
        mq_channel.start_consuming()
    except Exception as e:
        mq_channel.close()
        sys.exit()
I used a contextmanager to handle blocking the signals, so that all the logic is hidden away outside of the callback itself. This should also make it easier to reuse the code. Just to clarify how it's working, it's equivalent to this:
def callback(ch, method, properties, body):
    global processing_callback
    processing_callback = True
    try:
        print body
        sum(xrange(0, 200050000))
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With