Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python and RabbitMQ - Best way to listen to consume events from multiple channels?

I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

For example, I can consume events on one with the following:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?

like image 626
blindsnowmobile Avatar asked Feb 16 '15 20:02

blindsnowmobile


People also ask

Can RabbitMQ have multiple consumers?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

How do you consume messages from RabbitMQ?

In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.

Can RabbitMQ have multiple queues?

Use multiple queues and consumers Queues are single-threaded in RabbitMQ, and one queue can handle up to about 50 thousand messages. You will achieve better throughput on a multi-core system if you have multiple queues and consumers and if you have as many queues as cores on the underlying node(s).

How do you bind multiple queues in RabbitMQ?

Multiple bindingsIt is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues.


2 Answers

The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

In each example I'm going to assume exchange is already declared.

Threads

You can consume messages from two queues on separate hosts in single process using pika.

You are right - as its own FAQ states, pika is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading module looks as follows:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

I've declared callback_func as a method purely to use ConsumerThread.name while printing message body. It might as well be a function outside the ConsumerThread class.

Processes

Alternatively, you can always just run one process with consumer code per queue you want to consume events.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

and then run by:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

Async

Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

You can no longer use BlockingConnection in asynchronous code; fortunately, pika has adapter for Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

Python 3

Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pika is not yet ported.

But in case you would want to move to >=3.3, one possible option is to use asyncio with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp or aioamqp.

* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations

like image 136
Unit03 Avatar answered Oct 17 '22 16:10

Unit03


Below is an example of how I use one rabbitmq instance to listen to 2 queues at the same time:

import pika
import threading

threads=[]
def client_info(channel):    
   channel.queue_declare(queue='proxy-python')
   print (' [*] Waiting for client messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
       print (" Received %s" % (body))

   channel.basic_consume(callback, queue='proxy-python', no_ack=True)
   channel.start_consuming()

def scenario_info(channel):    
   channel.queue_declare(queue='savi-virnet-python')
   print (' [*] Waiting for scenrio messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
      print (" Received %s" % (body))

   channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
   channel.start_consuming()

def manager():
   connection1= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel1 = connection1.channel()
  connection2= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel2 = connection2.channel()
   t1 = threading.Thread(target=client_info, args=(channel1,))
   t1.daemon = True
   threads.append(t1)
   t1.start()  

   t2 = threading.Thread(target=scenario_info, args=(channel2,))
   t2.daemon = True
   threads.append(t2)


   t2.start()
   for t in threads:
     t.join()


 manager()
like image 2
NasimBM Avatar answered Oct 17 '22 15:10

NasimBM