I have multiple distributed competing consumers each pulling messages off the same (transactional) queue. I want to implement each consumer as an Idempotent Receiver so I never process the same message more than once (across all consumers) even if a duplicate arrives. How can I accomplish this with multiple consumers?
My first thought is to somehow generate a consecutive sequence number for each message before putting them on the queue and then use a shared database table to coordinate the work between consumers. I.e. consumer#1 processes msg#1 and then writes a row to DB table saying 'msg#1 is processed' (want it in a database to ensure durability). When a consumer is ready to process a message, it peeks at the next one available in the queue, consults the shared DB table and determines if this is the next msg in order. If so, it pulls it off the queue. If not, it ignores it.
In this way, I only need to store the last message processed (as there is a consecutive sequence number for all msgs), I don't need to use a buffer storing IDs of all messages received with a negotiated 'window' size, and the messages are always processed serially (which is what I want for this scenario).
Just curious if there is a better way? I'm concerned about the cost of querying the database whenever I need to process a message.
If the answer is "it depends on the framework", then I had MSMQ in mind
You can make a consumer idempotent by recording in the database the IDs of the messages that it has processed successfully. When processing a message, a consumer can detect and discard duplicates by querying the database. There are a couple of different places to store the message IDs.
To implement the Idempotent Consumer pattern the recommended approach is to add a table to the database to track processed messages. Each message needs to have a unique messageId assigned by the producing service, either within the payload, or as a Kafka message header.
The Idempotent Message Validator ensures that only unique messages continue through a flow's execution by checking the unique ID of the incoming message. You can use any incoming attribute of the Mule message as an ID, or you can compute the ID by configuring a DataWeave expression.
Enable multiple concurrent consumers to process messages received on the same messaging channel. With multiple concurrent consumers, a system can process multiple messages concurrently to optimize throughput, to improve scalability and availability, and to balance the workload.
The point of idempotent receiver is that is does not matter if a message is processed several times. Hence, idempotent receivers don't need to somehow detect that a message is a duplicate, they can simply process it as usual ...
So either your receiver is not idempotent, or you are worrying needlessly ...
I've accomplished idempotent messages by ensuring each message has a GUID or other unique identifier and then recording it in the same transaction as which you alter the state in your persistence store.
For each message you can now check if the unique id exists in your persistence store.
If the unique id exists, you know it was processed previously and state changes were persisted in the same transaction.
If the unique id does not exist, you know that it has never been processed.
If two consumers process the same message, because your table where you store your processed unique id's has a unique constraint, when it comes time for both consumers to commit their transactions, one of them must fail and rollback all of it's changes while the other will succeed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With