Using Kombu with RabbitMQ to implement a classic publish/subscribe design pattern. I have created a producer that creates a topic:
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
with Connection('amqp://guest:guest@localhost//') as conn:
producer = conn.Producer(serializer='json')
producer.publish('Hello World!',
exchange=media_exchange, routing_key='video',
declare=[video_queue])
I then created a consumer to consume from the publisher:
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
#message.ack()
with Connection('amqp://guest:guest@localhost//') as conn:
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
In then launch two consumers, each one in a separate terminal; both wait for a message:
terminal 1: python consumer.py
terminal 2: python consumer.py
When I run the producer, only one consumer receives the message.
The producer publishes in an exchange, not in a queue. The queues are defined by the consumers. When using different queue name for each consumer then all will get the message. When using many consumers for the same queue then it is load balancing, that's why only one of your consumers gets the message.
To clarify, the messages in the queue are 'consumed' i.e. the first consumer consumes it, and the message is no more in the queue, that's why the second consumer isn't getting anything.
To have 2 separate consumers for same message - use 2 separate queues i.e.
video_queue1
and video_queue2
, declared and bound to the exchange media_exchange
, using same key video
.
producer.py
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'topic', durable=False)
video_queue1 = Queue('video1', exchange=media_exchange, routing_key='video')
video_queue2 = Queue('video2', exchange=media_exchange, routing_key='video')
with Connection('amqp://guest:guest@localhost//') as conn:
producer = conn.Producer(serializer='json')
producer.publish('Hello World!',
exchange=media_exchange, routing_key='video',
declare=[video_queue1, video_queue2])
consumer1.py
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video1', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
#message.ack()
with Connection('amqp://guest:guest@localhost//') as conn:
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
consumer2.py
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video2', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
#message.ack()
with Connection('amqp://guest:guest@localhost//') as conn:
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With