I would like to send a message (directly) from a script and than process it, and send back the results. So it's like a double publish-subscribe.
I have 2 scripts:
The Client sends a message directly (simple string) to the Processer, and than the Processer script counts the characters in the string and sends back the results to the client.
This is how I tried to do:
The Processer waits for a message, calculates something and than answers back to the original sender.
#Processer.py:
import pika
import sys
#Sends back the score
#addr: Connection address
#exchName: Exchange name (where to send)
#rKey: Name of the queue for direct messages
#score: The detected score
def SendActualScore(addr, exchName, rKey, score):
#Send the image thru the created channel with the given routing key (queue name)
channel.basic_publish(exchange=exchName, routing_key=rKey, body=score)
print "(*) Sent: " + score
#When we receive something this is called
def CallbackImg(ch, method, properties, body):
print "(*) Received: " + str(body)
score = str(len(body))
#Send back the score
SendActualScore('localhost', 'valami', rKey, score)
#Subscribe connection
#Receive messages thru this
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#RECEIVE MESSAGES - Subscribe
channel.exchange_declare(exchange='valami', type='direct')
#Define a queue, where we don't need the name
#After we disconnected delete the queue (exclusive flag)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue
rKeys = sys.argv[1:]
for rKey in rKeys:
channel.queue_bind(exchange='valami', queue=queue_name, routing_key = rKey)
channel.basic_consume(CallbackImg, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
The Client just sends the message and than waits for the answer.
#Client.py:
import pika
import sys
connAddr = 'localhost'
#Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(connAddr))
channel = connection.channel()
#Define an exchange channel, we don't need a queue
channel.exchange_declare(exchange='valami', type='direct')
#Send the image thru the created channel
channel.basic_publish(exchange='valami', routing_key='msg', body='Message in the body')
print "[*] Sent"
def Callback(ch, method, properties, body):
print "(*) Received: " + str(body)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue
channel.queue_bind(exchange='valami', queue=queue_name)
channel.basic_consume(Callback, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
There could be multiple Clients and I don't know how to send back the messages directly to them.
As all current RabbitMQ queue types have destructive consume behaviour, i.e. messages are deleted from the queue when a consumer is finished with them, it is not possible to re-read messages that have been consumed. Streams will allow consumers to attach at any point in the log and read from there.
ack is used for positive acknowledgements. basic. nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1) basic.
Have you checked the tutorials for RPC in RabbitMQ w/ python and pika? http://www.rabbitmq.com/tutorials/tutorial-six-python.html
The gist of what you need to do in your client, is found in the RPC tutorial, but with a few modifications.
In your client, you will need to create an exclusive queue - the same way you did in your server.
When you send your message from the client, you need to set the reply_to
to the name of the client's exclusive queue
from the tutorial:
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
On the server, when you receive a message, you need to read the reply_to
header from the message and then basic_publish
the reply to that queue.
Rather than thinking about "client" and "server", it may be helpful to frame this in terms of "message producer" and "message consumer".
In your scenario, you need both of your processes to be both a publisher and consumer. The "client" will publish the original message and consume the response. The "server" will consume the original message and publish a response.
The only real difference in your code will be the use of the reply_to
header on the original message. This is the name of the queue to which you should publish the response.
Hope that helps!
P.S. I cover the core outline of this in my RabbitMQ Patterns eBook - both RPC and request / reply like you are needing. The book talks in principles and patterns, not in specific programming language (though I mostly write node.js and don't really know python).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With