Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can Kombu do publish and sucscribe to multiple consumers

Using Kombu with RabbitMQ to implement a classic publish/subscribe design pattern. I have created a producer that creates a topic:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

with Connection('amqp://guest:guest@localhost//') as conn:
    producer = conn.Producer(serializer='json')
    producer.publish('Hello World!',
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

I then created a consumer to consume from the publisher:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

In then launch two consumers, each one in a separate terminal; both wait for a message:

terminal 1: python consumer.py
terminal 2: python consumer.py

When I run the producer, only one consumer receives the message.

like image 830
user3376274 Avatar asked Jan 11 '23 04:01

user3376274


2 Answers

The producer publishes in an exchange, not in a queue. The queues are defined by the consumers. When using different queue name for each consumer then all will get the message. When using many consumers for the same queue then it is load balancing, that's why only one of your consumers gets the message.

like image 88
meili Avatar answered Jan 13 '23 06:01

meili


To clarify, the messages in the queue are 'consumed' i.e. the first consumer consumes it, and the message is no more in the queue, that's why the second consumer isn't getting anything.

To have 2 separate consumers for same message - use 2 separate queues i.e. video_queue1 and video_queue2, declared and bound to the exchange media_exchange, using same key video.

producer.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'topic', durable=False)
video_queue1 = Queue('video1', exchange=media_exchange, routing_key='video')
video_queue2 = Queue('video2', exchange=media_exchange, routing_key='video')


with Connection('amqp://guest:guest@localhost//') as conn:
    producer = conn.Producer(serializer='json')
    producer.publish('Hello World!',
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue1, video_queue2])

consumer1.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video1', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

consumer2.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video2', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()
like image 30
Nabeel Ahmed Avatar answered Jan 13 '23 07:01

Nabeel Ahmed