Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pika: how to consume messages synchronously

I would like to run a process periodically(like once per 10 minutes, or once per hour) that gets all the messages from queue, processes them and then exits. Is there any way to do this with pika or should I use a different python lib?

like image 216
Alexander Putilin Avatar asked Sep 14 '14 15:09

Alexander Putilin


People also ask

What is Pika Blockingconnection?

The blocking connection adapter module implements blocking semantics on top of Pika's core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.

How do I stop eating Pika?

You will have to cancel the consumer in your on_request method. You can also use this method to consume messages which allows an inactivity_timeout to be set, where you could then cancel your consumer.

What is delivery tag in RabbitMQ?

When a consumer (subscription) is registered, messages will be delivered (pushed) by RabbitMQ using the basic. deliver method. The method carries a delivery tag, which uniquely identifies the delivery on a channel. Delivery tags are therefore scoped per channel.

What is Pika Python?

Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue.


3 Answers

I think an ideal solution here would be to use the basic_get method. It will fetch a single message, but if the the queue is already empty it will return None. The advantage of this is that you can clear the queue with a simple loop, and then simply break the loop once None is returned, plus it is safe to run basic_get with multiple consumers.

This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result['body'])
    channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
like image 109
eandersson Avatar answered Oct 26 '22 11:10

eandersson


Would this work for you:

  1. Measure the current queue length as N = queue.method.message_count
  2. Make the callback count the processed messages and as soon as N are processed, call channel.stop_consuming.

So, client code would be something like this:

class CountCallback(object):
    def __init__(self, count):
        self.count = count

    def __call__(self, ch, method, properties, body):
        # process the message here
        self.count -= 1
        if not self.count:
            ch.stop_consuming()

channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
like image 31
bereal Avatar answered Oct 26 '22 11:10

bereal


@eandersson

This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.

updated for amqpstorm 2.6.1 :

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result.body)
    channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()
like image 37
Fardin Allahverdi Avatar answered Oct 26 '22 12:10

Fardin Allahverdi