Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use Pika to send and receive RabbitMQ messages?

I'm having some issue getting Pika to work with routing keys or exchanges in a way that's consistent with it AMQP or RabbitMQ documentation. I understand that the RabbitMQ documentation uses an older version of Pika, so I have disregarded their example code.

What I'm trying to do is define a queue, "order" and have two consumers, one that handle the exchange or routing_key "production" and one that handles "test". From looking at that RabbitMQ documentation that should be easy enough to do by using either a direct exchange and routing keys or by using a topic exchange.

Pika however doesn't appear to know what to do with the exchanges and routing keys. Using the RabbitMQ management tool to inspect the queues, it's pretty obvious that Pika either didn't queue the message correctly or that RabbitMQ just threw it away.

On the consumer side it isn't really clear how I should bind a consumer to an exchange or handle routing keys and the documentation isn't really helping.

If I drop all ideas or exchanges and routing keys, messages queue up nicely and are easily handled by my consumer.

Any pointers or example code people have would be nice.

like image 917
Simon Avatar asked Dec 16 '22 15:12

Simon


1 Answers

As it turns out, my understanding of AMQP was incomplete.

The idea is as following:

Client:

The client after getting the connection should not care about anything else but the name of the exchange and the routing key. That is we don't know which queue this will end up in.

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

Consumer

When the channel is open, we declare the exchange and queue

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

When the queue is ready, in the "on_queue_declared" callback is a good place, we can bind the queue to the exchange, using our desired routing key.

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

Messages send to the "order" exchange with the routing key "order.test.customer" will now be routed to the "test" queue, where the consumer can pick it up.

like image 155
Simon Avatar answered Jan 09 '23 17:01

Simon