I'd like to selectively delete messages from an AMQP queue without even reading them.
The scenario is as follows:
Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.
How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?
I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.
The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.
Log in to the Management Web UI. For details, see Connecting to the Management Address of a RabbitMQ Instance. On the Queues tab page, click the name of a queue. Click Purge Messages to remove messages from the queue.
You have to make consumer ack them (or nack ) and only after that they will be removed. Alternatively you can shutdown consumers and purge the queue completely. If you are looking for some way to purge all unacked messages - there are no such feature nor in AMQP protocol neither in RabbitMQ.
To remove all messages from a particular queue, click the Queues tab. Select a queue, right-click, and then select Remove Messages (with NDR) or Remove Messages (without NDR). A dialog box appears that confirms the selected action and displays, Do you want to continue?. Click Yes.
You do not want a message queue, you want a key-value database. For instance you could use Redis or Tokyo Tyrant to get a simple network-accessible key-value database. Or just use a memcache.
Each message type is a key. When you write a new message with the same key, it overwrites the previous value so the reader of this database will never be able to get out of date information.
At this point, you only need a message queue to establish the order in which keys should be read, if that is important. Otherwise, just continually scan the database. If you do continually scan the database, it is best to put the database near the readers to reduce network traffic.
I would probably do something like this
key: typecode
value: lastUpdated, important data
Then I would send messages that contain
typecode, lastUpdated
That way the reader can compare lastupdated for that key to the one that they last read from the database and skip reading it because they are already up to date.
If you really need to do this with AMQP, then use RabbitMQ and a custom exchange type, specifically a Last Value Cache Exchange. Example code is here https://github.com/squaremo/rabbitmq-lvc-plugin
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With