Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous message handling by Pika RabbitMQ client

Following Pika timed received example, I would like to have a client handling more concurrent requests. My question is, if handle_delivery could be somehow called each time new message is received and not waiting for previous handle_delivery return?

like image 477
marxin Avatar asked Mar 06 '12 14:03

marxin


People also ask

Is RabbitMQ synchronous or asynchronous?

RabbitMQ includes a wide variety of features that make it useful when building distributed systems that communicate via asynchronous messaging. To get started, reading our RabbitMQ - Getting started guide is a great way to learn more about message queue architecture!

What is Pika in RabbitMQ?

Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue. Topics. Prerequisites. Permissions.

What type of consumer is a Pika?

Asynchronous consumer example — pika 1.2.

Is RabbitMQ bidirectional?

The basic pipe is unidirectional. You cannot send messages from the consumer to the producer through the same queue that the consumer received messages from the producer. If you want send messages the other way, your consumer will need to be a producer as well, and your producer will need to be a consumer as well.


1 Answers

It looks like the call to handle_delivery is blocking, but you could have it add a secondary handler to the I/O event loop using add_timeout. I think this is what you are looking to do:

"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""

import sys
import time

from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters

connection = None
channel = None


def on_connected(connection):
    print "timed_receive: Connected to RabbitMQ"
    connection.channel(on_channel_open)


def on_channel_open(channel_):
    global channel
    channel = channel_
    print "timed_receive: Received our Channel"
    channel.queue_declare(queue="test", durable=True,
                          exclusive=False, auto_delete=False,
                          callback=on_queue_declared)

class TimingHandler(object):
    count = 0
    last_count = 0

    def __init__(self, delay=0):
        self.start_time = time.time()
        self.delay = delay

    def handle_delivery(self, channel, method, header, body):
        connection.add_timeout(self.delay, self)

    def __call__(self):
        self.count += 1
        if not self.count % 1000:
            now = time.time()
            duration = now - self.start_time
            sent = self.count - self.last_count
            rate = sent / duration
            self.last_count = self.count
            self.start_time = now
            print "timed_receive: %i Messages Received, %.4f per second" %\
                  (self.count, rate)

def on_queue_declared(frame):
    print "timed_receive: Queue Declared"
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)


if __name__ == '__main__':

    # Connect to RabbitMQ
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
    connection = SelectConnection(ConnectionParameters(host),
                                  on_connected)
    # Loop until CTRL-C
    try:
        # Start our blocking loop
        connection.ioloop.start()

    except KeyboardInterrupt:

        # Close the connection
        connection.close()

        # Loop until the connection is closed
        connection.ioloop.start()
like image 191
Aryeh Leib Taurog Avatar answered Oct 06 '22 05:10

Aryeh Leib Taurog