Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ wait for multiple queues to finish

Ok here is an overview of what's going on:

    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

So I have an exchange that pushes to multiple queues, each queue has a task, once all tasks are completed, only then can Queue 4 start.

So message with unique id of 1234 gets sent to the exchange, the exchange routes it to all the task queues ( Q1, Q2, Q3, etc... ), when all the tasks for message id 1234 have completed, run Q4 for message id 1234.

How can I implement this?

Using Symfony2, RabbitMQBundle and RabbitMQ 3.x

Resources:

  • http://www.rabbitmq.com/tutorials/amqp-concepts.html
  • http://www.rabbitmq.com/tutorials/tutorial-six-python.html

UPDATE #1

Ok I think this is what I'm looking for:

  • https://github.com/videlalvaro/Thumper/tree/master/examples/parallel_processing

RPC with Parallel Processing, but how do I set the Correlation Id to be my unique id to group the messages and also identify what queue?

like image 257
Phill Pafford Avatar asked Dec 13 '12 14:12

Phill Pafford


1 Answers

You need to implement this: http://www.eaipatterns.com/Aggregator.html but the RabbitMQBundle for Symfony doesn't support that so you would have to use the underlying php-amqplib.

A normal consumer callback from the bundle will get an AMQPMessage. From there you can access the channel and manually publish to whatever exchanges comes next in your "pipes and filters" implementation

like image 128
old_sound Avatar answered Sep 20 '22 14:09

old_sound