Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Kombu ConsumerMixin, how to declare multiple bindings?

I have a RabbitMQ topic exchange named experiment. I'm building a consumer where I'd like to receive all messages whose routing key begins with "foo" and all messages whose routing key begins with "bar".

According to the RabbitMQ docs, and based on my own experimentation in the management UI, it should be possible to have one exchange, one queue, and two bindings (foo.# and bar.#) that connect them.

I can't figure out how to express this using Kombu's ConsumerMixin. I feel like I should be able to do:

q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])

...but it does not like that at all. I've also tried:

q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')

...but every time I try I get:

kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel

...which I guess manes sense. However I can't see a place in the mixin's interface where I can easily hook onto the queues once they are bound to the channel. Here's the base (working) code:

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin


class Worker(ConsumerMixin):
    exchange = Exchange('experiment', type='topic')
    q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.q], callbacks=[self.on_task])]

    def on_task(self, body, message):
        print body
        message.ack()


if __name__ == '__main__':
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        worker = Worker(conn)
        worker.run()

...which works, but only gives me foo messages. Other than creating a new Queue for each routing key I'm interested in and passing them all to the Consumer, is there a clean way to do this?

like image 958
smitelli Avatar asked Jan 07 '23 02:01

smitelli


1 Answers

After digging a little bit, I found a way to accomplish this that is fairly close to the first idea I had. Instead of passing a routing_key string to the Queue, pass a bindings list. Each element in the list is an instance of a binding object that specifies the exchange and the routing key.

An example is worth a thousand words:

from kombu import Exchange, Queue, binding

exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
    binding(exchange, routing_key='foo.#'),
    binding(exchange, routing_key='bar.#')
], exclusive=True)

And it works great!

like image 193
smitelli Avatar answered Jan 25 '23 11:01

smitelli