Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ reordering messages

RabbitMQ ticks all the boxes for the project I am planning, save one. I would have different workers listening on a queue and it is important that they process the newest messages (i.e., latest sequence number) first (LIFO).

My application is such that newer messages pretty much obsolete older messages. If you have workers to spare you could still process the older messages but it is important the newer ones are done first.

After trawling the various forums and such I can only see one solution and that is for a client to process a message it should first:

  • consume all messages
  • re-order them according to the sequence number
  • re-submit to the queue
  • consume the first message

Ugly and problematic if the client dies halfway. But mabye somebody here has a better solution.

My research is based (in part) on:

  • http://groups.google.com/group/rabbitmq-discuss/browse_thread/thread/e79e77d86bc7a3b8?fwc=1
  • http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-July/007934.html
  • http://groups.google.com/group/rabbitmq-discuss/browse_thread/thread/e40d1069dcebe2cc
  • http://old.nabble.com/Priority-Queue-implementation-and-performance-td29946348.html

Note: the expected traffic of messages will roughly be in the range of 1 msg/hour for some queues and 100/minute for others. So nothing stellar.

like image 785
dgorissen Avatar asked Feb 11 '11 16:02

dgorissen


2 Answers

Since there is no reply I guess I did my homework rather well ;)

Anyway, after discussing the requirements with the other stakeholders it was decided I can drop the LIFO requirement for now. We can worry about that when it comes to it.

A solution that we will probably end up adopting is for the worker to open a second queue that the master can use to let the worker know what jobs to ignore + provide additional control/monitoring information (which it looks like we will need anyway).

The RabbitMQ implementing the AMQP 1.0 spec may also help here.

So I will mark this question as answered for now. Somebody else is still free to add or improve.

like image 99
dgorissen Avatar answered Sep 17 '22 12:09

dgorissen


One possibility might be to use basic.get in a loop and wait for the response basic-ok.message-count to become zero (throwing away all other messages):

while (<get ok> = <call basic.get>) {
  if (<get ok>.message-count == 0) {
    // Now <get ok> is the most recent message on this queue
    break;
  } else if (<is get-empty>) {
    // Someone else got it
  }
}

Of course, you'd have to set up the message routing patterns on the broker such that 1 consumer throwing away messages doesn't mess with another. Try to avoid re queueing messages as they will re queue at the top of the stack, making them look like the most recent.

like image 36
Robin Avatar answered Sep 19 '22 12:09

Robin