Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consume multiple queues in python / pika

I am trying to create a consumer that would subscribe to multiple queues, and then process messages as they arrive.

The problem is that when there is some data already present in the first queue, it consumes the first queue and never goes to consume the second queue. However, when the first queue is empty, it does go to the next queue, and then consumes both queues simultaneously.

I had first implemented threading but want to steer clear of it, when pika library does it for me without much complexity. Below is my code:

import pika  mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x')) mq_channel = mq_connection.channel() mq_channel.basic_qos(prefetch_count=1)   def callback(ch, method, properties, body):     print body     mq_channel.basic_ack(delivery_tag=method.delivery_tag)  mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0") mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0") mq_channel.start_consuming() 
like image 609
user3295878 Avatar asked Jul 01 '14 12:07

user3295878


1 Answers

One possible solution is to use non blocking connection and consume messages.

import pika   def callback(channel, method, properties, body):     print(body)     channel.basic_ack(delivery_tag=method.delivery_tag)   def on_open(connection):     connection.channel(on_channel_open)   def on_channel_open(channel):     channel.basic_consume(callback, queue='queue1')     channel.basic_consume(callback, queue='queue2')   parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F') connection = pika.SelectConnection(parameters=parameters,                                    on_open_callback=on_open)  try:     connection.ioloop.start() except KeyboardInterrupt:     connection.close() 

This will connect to multiple queues and will consume messages accordingly.

like image 178
Pandikunta Anand Reddy Avatar answered Oct 01 '22 09:10

Pandikunta Anand Reddy