Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Implementation

Here is a little about the setup I have currently.

  • REST API to push ( POST ) data into a queue
  • The Queue has a Consumer that's always running and Produces to en Exchange
  • The Exchange routes to several other Queues ( like 20+ )
  • Each of the ( 20+ ) Queues does a specific task ( The Consumers always runs as well )
  • Cron job runs to check if all the ( 20+ ) Tasks are completed and Produces to yet another Queue

I'm not sure I like the Consumers running all the time as each Consumer utilizes around 300MB of Ram ( I think it's MBs, it's not in front of me at the moment ) and I'm looking for another implementation.

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

I the related question below it was suggested to use RPC but the problem with this is the RPC ( to my understanding ) Will have multiple instances. This is a resource intense process as is and I think by adding the RPC calls it will just bog down the server and then become unresponsive ( Please correct me if I'm wrong ).

Another approach was to use the Aggregator pattern

  • http://www.eaipatterns.com/Aggregator.html

Which looks exactly what I need but I found the documentation limited. Has anyone done this pattern?

My question is I'm not happy with how it's currently implemented and I'm looking for ways to improve the process. I'm looking to either get rid of the CRON, Implement a new Pattern and Not have the Consumers run all the time.

The process currently also only supports a single instance of each consumer. It can have multiple consumers but how we have implemented it we only wanted one at the time.

This is implemented in PHP, Symfony2 Framework using the RabbitMQBundle

Related Question:

  • RabbitMQ wait for multiple queues to finish
like image 558
Phill Pafford Avatar asked Dec 06 '25 08:12

Phill Pafford


1 Answers

OldSound here, creator of the RabbitMQ Bundle.

The bundle per se doesn't support the Aggregator pattern out of the box but you could implement it with the underlying php-amqplib.

To perform the aggregation you need to publish messages with a correlation ID and thread that id along the processing chain. Then the aggregator will be waiting for X amount of messages, according to the amount of different workers you have to process that particular task. A way to wait for messages is to have an array were you keep them as they come indexed by correlation id.

So whenever you have an incoming message you will do:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

And then somewhere you do:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

I'm actually working on a Workflow bundle for Symfony right now which I plan to open source any time soon. That bundle could be used to implement the use case you present in a very simple way (i.e. you will only need to provide services for each task).

Now I wonder why each consumers take 300 MB of RAM? Do you need to run the full stack framework with them? If possible create a new Symfony Kernel for a consumers app and only load what you need there to reduce overhead.

like image 119
old_sound Avatar answered Dec 08 '25 20:12

old_sound