Following Pika timed received example, I would like to have a client handling more concurrent requests. My question is, if handle_delivery could be somehow called each time new message is received and not waiting for previous handle_delivery return?
RabbitMQ includes a wide variety of features that make it useful when building distributed systems that communicate via asynchronous messaging. To get started, reading our RabbitMQ - Getting started guide is a great way to learn more about message queue architecture!
Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue. Topics. Prerequisites. Permissions.
Asynchronous consumer example — pika 1.2.
The basic pipe is unidirectional. You cannot send messages from the consumer to the producer through the same queue that the consumer received messages from the producer. If you want send messages the other way, your consumer will need to be a producer as well, and your producer will need to be a consumer as well.
It looks like the call to handle_delivery
is blocking, but you could have it add a secondary handler to the I/O event loop using add_timeout
. I think this is what you are looking to do:
"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""
import sys
import time
from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters
connection = None
channel = None
def on_connected(connection):
print "timed_receive: Connected to RabbitMQ"
connection.channel(on_channel_open)
def on_channel_open(channel_):
global channel
channel = channel_
print "timed_receive: Received our Channel"
channel.queue_declare(queue="test", durable=True,
exclusive=False, auto_delete=False,
callback=on_queue_declared)
class TimingHandler(object):
count = 0
last_count = 0
def __init__(self, delay=0):
self.start_time = time.time()
self.delay = delay
def handle_delivery(self, channel, method, header, body):
connection.add_timeout(self.delay, self)
def __call__(self):
self.count += 1
if not self.count % 1000:
now = time.time()
duration = now - self.start_time
sent = self.count - self.last_count
rate = sent / duration
self.last_count = self.count
self.start_time = now
print "timed_receive: %i Messages Received, %.4f per second" %\
(self.count, rate)
def on_queue_declared(frame):
print "timed_receive: Queue Declared"
channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)
if __name__ == '__main__':
# Connect to RabbitMQ
host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
connection = SelectConnection(ConnectionParameters(host),
on_connected)
# Loop until CTRL-C
try:
# Start our blocking loop
connection.ioloop.start()
except KeyboardInterrupt:
# Close the connection
connection.close()
# Loop until the connection is closed
connection.ioloop.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With