Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume RabbitMQ messages via pika for some limited time?

All the examples in pika tutorial end with the client invoking start_consuming(), which starts an infinite loop. These examples work for me.

However, I do not want my client to run forever. Instead, I need my client to consume messages for some time, such as 15 minutes, then stop.

How do I accomplish that?

like image 559
AlexC Avatar asked Nov 17 '14 16:11

AlexC


People also ask

How do you consume messages from RabbitMQ?

In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.

How do I get RabbitMQ data?

In the Managed Service for ClickHouse cluster, create a table on the RabbitMQ engine. Send the test data to the RabbitMQ queue. Check that the test data is present in the Managed Service for ClickHouse cluster table. Delete the resources you created.

Is RabbitMQ round robin?

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin.


1 Answers

You can consume messages one at a time with your own loops, say you have a channel and queue setup. The following will check if the queue is empty, and if not, pop a single message off of it.

queue_state = channel.queue_declare(queue, durable=True, passive=True)
queue_empty = queue_state.method.message_count == 0

declaring a queue that already exists, and setting the passive flags allows you to query it's state. Next we process a message:

if not queue_empty:
    method, properties, body = channel.basic_get(queue, no_ack=True)
    callback_func(channel, method, properties, body)

Here callback_func is our normal callback. Make sure not to register the callback with the queue when you want to process this way.

# DO NOT
channel.basic_consume(callback_func, queue, no_ack=True)

This will make the manual consume do weird things. I have seen the queue_declare code actually process a message if I have made this call beforehand.

like image 68
Mike Avatar answered Sep 25 '22 17:09

Mike