I would like to run a process periodically(like once per 10 minutes, or once per hour) that gets all the messages from queue, processes them and then exits. Is there any way to do this with pika
or should I use a different python lib?
The blocking connection adapter module implements blocking semantics on top of Pika's core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.
You will have to cancel the consumer in your on_request method. You can also use this method to consume messages which allows an inactivity_timeout to be set, where you could then cancel your consumer.
When a consumer (subscription) is registered, messages will be delivered (pushed) by RabbitMQ using the basic. deliver method. The method carries a delivery tag, which uniquely identifies the delivery on a channel. Delivery tags are therefore scoped per channel.
Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue.
I think an ideal solution here would be to use the basic_get method. It will fetch a single message, but if the the queue is already empty it will return None
. The advantage of this is that you can clear the queue with a simple loop, and then simply break the loop once None
is returned, plus it is safe to run basic_get with multiple consumers.
This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
Would this work for you:
N = queue.method.message_count
N
are processed, call channel.stop_consuming
.So, client code would be something like this:
class CountCallback(object):
def __init__(self, count):
self.count = count
def __call__(self, ch, method, properties, body):
# process the message here
self.count -= 1
if not self.count:
ch.stop_consuming()
channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
@eandersson
This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.
updated for amqpstorm 2.6.1 :
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result.body)
channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With