Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Receive a message with RabbitMQ then process it then send back the results

I would like to send a message (directly) from a script and than process it, and send back the results. So it's like a double publish-subscribe.

I have 2 scripts:

  • Processer
  • Client

The Client sends a message directly (simple string) to the Processer, and than the Processer script counts the characters in the string and sends back the results to the client.

This is how I tried to do:

The Processer waits for a message, calculates something and than answers back to the original sender.

#Processer.py:
import pika
import sys

#Sends back the score
#addr: Connection address
#exchName: Exchange name (where to send)
#rKey: Name of the queue for direct messages
#score: The detected score
def SendActualScore(addr, exchName, rKey, score):
    #Send the image thru the created channel with the given routing key (queue name)
    channel.basic_publish(exchange=exchName, routing_key=rKey, body=score)
    print "(*) Sent: " + score

#When we receive something this is called
def CallbackImg(ch, method, properties, body):
    print "(*) Received: " + str(body)
    score = str(len(body))
    #Send back the score
    SendActualScore('localhost', 'valami', rKey, score)


#Subscribe connection
#Receive messages thru this
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#RECEIVE MESSAGES - Subscribe
channel.exchange_declare(exchange='valami', type='direct')
#Define a queue, where we don't need the name
#After we disconnected delete the queue (exclusive flag)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue

rKeys = sys.argv[1:]
for rKey in rKeys:
    channel.queue_bind(exchange='valami', queue=queue_name, routing_key = rKey)

channel.basic_consume(CallbackImg, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

The Client just sends the message and than waits for the answer.

#Client.py:
import pika
import sys

connAddr = 'localhost'

#Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(connAddr))
channel = connection.channel()

#Define an exchange channel, we don't need a queue
channel.exchange_declare(exchange='valami', type='direct')

#Send the image thru the created channel
channel.basic_publish(exchange='valami', routing_key='msg', body='Message in the body')

print "[*] Sent"

def Callback(ch, method, properties, body):
    print "(*) Received: " + str(body)

result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue

channel.queue_bind(exchange='valami', queue=queue_name)

channel.basic_consume(Callback, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

There could be multiple Clients and I don't know how to send back the messages directly to them.

like image 201
Gabe Avatar asked Jun 27 '16 15:06

Gabe


People also ask

Can RabbitMQ replay messages?

As all current RabbitMQ queue types have destructive consume behaviour, i.e. messages are deleted from the queue when a consumer is finished with them, it is not possible to re-read messages that have been consumed. Streams will allow consumers to attach at any point in the log and read from there.

What is ack and NACK in RabbitMQ?

ack is used for positive acknowledgements. basic. nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1) basic.


1 Answers

Have you checked the tutorials for RPC in RabbitMQ w/ python and pika? http://www.rabbitmq.com/tutorials/tutorial-six-python.html


The gist of what you need to do in your client, is found in the RPC tutorial, but with a few modifications.

In your client, you will need to create an exclusive queue - the same way you did in your server.

When you send your message from the client, you need to set the reply_to to the name of the client's exclusive queue

from the tutorial:

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

On the server, when you receive a message, you need to read the reply_to header from the message and then basic_publish the reply to that queue.


Rather than thinking about "client" and "server", it may be helpful to frame this in terms of "message producer" and "message consumer".

In your scenario, you need both of your processes to be both a publisher and consumer. The "client" will publish the original message and consume the response. The "server" will consume the original message and publish a response.

The only real difference in your code will be the use of the reply_to header on the original message. This is the name of the queue to which you should publish the response.

Hope that helps!


P.S. I cover the core outline of this in my RabbitMQ Patterns eBook - both RPC and request / reply like you are needing. The book talks in principles and patterns, not in specific programming language (though I mostly write node.js and don't really know python).

like image 185
Derick Bailey Avatar answered Oct 14 '22 08:10

Derick Bailey