I'm trying to send a python dictionary from a python producer to a python consumer using RabbitMQ. The producer first establishes the connection to local RabbitMQ server. Then it creates a queue to which the message will be delivered, and finally sends the message. The consumer first connects to RabbitMQ server and then makes sure the queue exists by creating the same queue. It then receives the message from producer within the callback function, and prints the 'id' value (1). Here are the scripts for producer and consumer:
producer.py script:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = {'id': 1, 'name': 'name1'} channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
consumer.py script:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) print(body['id']) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
But, when I run the producer.py, I get this error:
line 18, in <module> delivery_mode = 2, # make message persistent File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 1978, in basic_publish mandatory, immediate) File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 2064, in publish immediate=immediate) File "/Library/Python/2.7/site-packages/pika/channel.py", line 338, in basic_publish (properties, body)) File "/Library/Python/2.7/site-packages/pika/channel.py", line 1150, in _send_method self.connection._send_method(self.channel_number, method_frame, content) File "/Library/Python/2.7/site-packages/pika/connection.py", line 1571, in _send_method self._send_message(channel_number, method_frame, content) File "/Library/Python/2.7/site-packages/pika/connection.py", line 1596, in _send_message content[1][s:e]).marshal()) TypeError: unhashable type
Could anybody help me? Thanks!
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin.
You can't send native Python types as your payload, you have to serialize them first. I recommend using JSON:
import json channel.basic_publish(exchange='', routing_key='task_queue', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
and
def callback(ch, method, properties, body): print(" [x] Received %r" % json.loads(body))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With