Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ non-blocking consumer

I'm using RabbitMQ in Python to manage several queues between a producer and multiple consumers. In the example in RabbitMQ website (routing model), the consumers are blocked. It means that they stop on start_consuming() and execute the callback function every time there is a new "task" in the queue.

My question is: how can I implement my consumer in a way that he is still waiting for tasks (so, the callback function is called every time there is new things in the queue) but at the same time he can execute other work/code.

Thank you

like image 306
Hugo Sousa Avatar asked Feb 19 '14 13:02

Hugo Sousa


People also ask

How does RabbitMQ push messages to consumers?

Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.

Can RabbitMQ have multiple consumers?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

How do you stop a consumer in RabbitMQ?

Overview. When a channel is consuming from a queue, there are various reasons which could cause the consumption to stop. One of these is obviously if the client issues a basic. cancel on the same channel, which will cause the consumer to be cancelled and the server replies with a basic.


1 Answers

Form FAQ:

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads,

So lets create the connection inside the thread:

import pika


class PikaMassenger():

    exchange_name = '...'

    def __init__(self, *args, **kwargs):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(
            exchange=self.exchange_name, 
            exchange_type='topic')

    def consume(self, keys, callback):
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        for key in keys:
            self.channel.queue_bind(
                exchange=self.exchange_name, 
                queue=queue_name, 
                routing_key=key)

        self.channel.basic_consume(
            queue=queue_name, 
            on_message_callback=callback, 
            auto_ack=True)

        self.channel.start_consuming()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()

def start_consumer():

    def callback(ch, method, properties, body):
        print(" [x] %r:%r consumed" % (method.routing_key, body))

    with PikaMassenger() as consumer:
        consumer.consume(keys=[...], callback=callback)


consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()
like image 140
adnanmuttaleb Avatar answered Sep 21 '22 11:09

adnanmuttaleb