I'm using RabbitMQ's round robin feature to dispatch messages between multiple consumers but having only one of them receive the actual message at a time.
My problem is that my messages represent tasks and I would like to have local sessions (state) on my consumers. I know beforehand which messages belong to which session but I don't know what is the best way (or is there a way?) to make RabbitMQ dispatch to consumers using an algorithm I specify.
I don't want to write my own orchestration service because it will become a bottleneck and I don't want my producers to know which consumer will take their messages because I'll lose the decoupling I get using Rabbit.
Is there a way to make RabbitMQ dispatch my messages to consumers based on a pre-defined algorithm/rule instead of round robin?
Clarification: I use several microservices written in different languages and each service has its own job. I communicate between them using protobuf messages. I give each new message a UUID
. If a consumer receives a message it can create a response message from it (this might not be the correct terminology since the producers and consumers are decoupled and they don't know about each other) and this UUID
is copied to the new message. This forms a data transformation pipeline and this "process" is identified by the UUID
(the processId). My problem is that it is possible that I have multiple worker consumers and I need a worker to stick to an UUID
if it has seen it before. I have this need because
Since RabbitMQ distributes tasks between workers using round robin I can't force my processes to stick to a worker. I have several caveats:
If there is a workaround which does not involve changing the round robin algorithm and does not break my constraints it is also OK!
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin.
In RabbitMQ, a producer never sends a message directly to a queue. Instead, it uses an exchange as a routing mediator. Therefore, the exchange decides if the message goes to one queue, to multiple queues, or is simply discarded.
A RabbitMQ Queue is a sequential data structure in which an item can be enqueued at the last or dequeued from the head.
If you don't want to go for an orchestration service, you can try a topology like that instead:
For the simplicity sake I assume that your processId
is used as the routing key (in the real world you may want to store it in the header and use header
exchange instead).
An incoming message will be accepted by the Incoming Exchange (type: direct), which has an alternative-exchange
attribute set to point to the No Session Exchange
(fanout).
Here is what RabbitMQ docs say on the 'Alternative Exchanges`:
It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings).
Typical examples of this are
- detecting when clients accidentally or maliciously publish messages that cannot be routed
- "or else" routing semantics where some messages are handled specially and the rest by a generic handler
RabbitMQ's Alternate Exchange ("AE") feature addresses these use cases.
(we are particularly interested in the or else
use case here)
Each consumer will create it's own queue and bind it to the Incoming Exchange, using processId(s)
for the session(s) it is aware of so far, as the binding's routing key.
This way it will only get messages for the sessions it is interested in.
In addition, all the consumers will bind to the shared No Session Queue.
If a message with a previously unknown processId
comes in, there will be no specific binding for it registered with the Incoming Exchange so it will get re-routed to the No Session Exchange => No Session Queue and get dispatched to one of the Consumers in a usual (round-robin) manner.
A consumer will then register a new binding for it with the Incoming Exchange (i.e. start a new "session"), so that it will then be getting all the subsequent messages with this processId
.
Once "session" is over it will have to remove the corresponding binding (i.e. close the "session").
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With