I'm using RabbitMQ in Python to manage several queues between a producer and multiple consumers. In the example in RabbitMQ website (routing model), the consumers are blocked. It means that they stop on start_consuming() and execute the callback function every time there is a new "task" in the queue.
My question is: how can I implement my consumer in a way that he is still waiting for tasks (so, the callback function is called every time there is new things in the queue) but at the same time he can execute other work/code.
Thank you
Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.
RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.
Overview. When a channel is consuming from a queue, there are various reasons which could cause the consumption to stop. One of these is obviously if the client issues a basic. cancel on the same channel, which will cause the consumer to be cancelled and the server replies with a basic.
Form FAQ:
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads,
So lets create the connection inside the thread:
import pika
class PikaMassenger():
exchange_name = '...'
def __init__(self, *args, **kwargs):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.conn.channel()
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='topic')
def consume(self, keys, callback):
result = self.channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
for key in keys:
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=key)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True)
self.channel.start_consuming()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.conn.close()
def start_consumer():
def callback(ch, method, properties, body):
print(" [x] %r:%r consumed" % (method.routing_key, body))
with PikaMassenger() as consumer:
consumer.consume(keys=[...], callback=callback)
consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With