I am running code on python to send and receive from RabbitMQ queue from another application where I can't allow threading. This is very newbie question but, is there a possibility to just check if there is message and if there are no any then just quit listening ? How should I change basic "Hello world" example for such task? Currently I've managed to stop consuming if I get a message, but if there are no messages my method receive() just continue waiting. How to force it not to wait if there are no messages? Or maybe wait only for given amount of time?
import pika global answer def send(msg): connection = pika.BlockingConnection(pika.ConnectionParameters()) channel = connection.channel() channel.queue_declare(queue='toJ') channel.basic_publish(exchange='', routing_key='toJ', body=msg) connection.close() def receive(): connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='toM') channel.basic_consume(callback, queue='toM', no_ack=True) global answer return answer def callback(ch, method, properties, body): ch.stop_consuming() global answer answer = body
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
You can kill connections to the RabbitMQ broker using the rabbitmqctl tool (see the man page) or by using the Web UI. You could also purge and delete the queue which belonged to the rogue consumer. However, you can't kill the consumer process itself using those tools.
RabbitMQ Unacked Messages are the messages that are not Acknowledged. If a consumer fails to acknowledge messages, the RabbitMQ will keep sending new messages until the prefetch value set for the associated channel is equal to the number of RabbitMQ Unacked Messages count.
Ok, I found following solution:
def receive(): parameters = pika.ConnectionParameters(RabbitMQ_server) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='toM') method_frame, header_frame, body = channel.basic_get(queue = 'toM') if method_frame.NAME == 'Basic.GetEmpty': connection.close() return '' else: channel.basic_ack(delivery_tag=method_frame.delivery_tag) connection.close() return body
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With