Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit concurrent message consuming based on a criteria

The scenario (I've simplified things):

  • Many end users can start jobs (heavy jobs, like rendering a big PDF for example), from a front end web application (producer).
  • The jobs are sent to a single durable RabbitMQ queue.
  • Many worker applications (consumers) processes those jobs and write the results back in a datastore.

This fairly standard pattern is working fine.

The problem: if a user starts 10 jobs in the same minute, and only 10 worker applications are up at that time of day, this end user is effectively taking over all the compute time for himself.

The question: How can I make sure only one job per end user is processed at any time ? (Bonus: some end users (admins for example) must not be throttled)

Also, I do not want the front end application to block end users from starting concurrent jobs. I just want the end users to wait for their concurrent jobs to finish one at a time.

The solution?: Should I dynamically create one auto-delete exclusive queue per end users ? If yes, how can I tell the worker applications to start consuming this queue ? How to ensure one (and only one) worker will consume from this queue ?

like image 819
Pierre-David Belanger Avatar asked Feb 09 '15 16:02

Pierre-David Belanger


People also ask

What is consumer capacity in RabbitMQ?

Consumer capacity will be 0% for queues that have no consumers. For queues that have online consumers but no message flow, the value will be 100%: the idea is that any number of consumers can sustain this kind of delivery rate.

How many messages can RabbitMQ handle per second?

The RabbitMQ message broker was deployed atop Google Compute Engine where it demonstrated the ability to receive and deliver more than one million messages per second (a sustained combined ingress/egress of over two million messages per second).

Can a RabbitMQ queue have multiple consumers?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

Does RabbitMQ push or pull?

RabbitMQ uses a push-based model with a smart producer, which means the producer decides when to push data. A prefetch limit is defined on the consumer to stop the producer from overwhelming consumers. Such a push-based approach is suited for low latency messaging.


1 Answers

You would need to build something yourself to implement this as Dimos says. Here is an alternative implementation which requires an extra queue and some persistent storage.

  • As well as the existing queue for jobs, create a "processable job queue". Only jobs that satisfy your business rules are added to this queue.
  • Create a consumer (named "Limiter") for the job queue. The Limiter also needs persistent storage (e.g. Redis or a relational database) to record which jobs are currently processing. The limiter reads from the job queue and writes to the processable job queue.
  • When a worker application finishes processing a job, it adds a "job finished" event to the job queue.

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter | 
    ------------     ------------     ----------- 
                         ^                |                    
                         |                V                    
                         |     ------------------------       
                         |    () processable job queue )  
           job finished  |     ------------------------       
                         |                |
                         |                V
                         |     ------------------------
                         \-----| Job Processors (x10) |
                               ------------------------
    

The logic for the limiter is as follows:

  • When a job message is received, check the persistent storage to see if a job is already running for the current user:
    • If not, record the job in the storage as running and add the job message to the processable job queue.
    • If an existing job is running, record the job in the storage as a pending job.
    • If the job is for an admin user, always add it to the processable job queue.
  • When a "job finished" message is received, remove that job from the "running jobs" list in the persistent storage. Then check the storage for a pending job for that user:
    • If a job is found, change the status of that job from pending to running and add it to the processable job queue.
    • Otherwise, do nothing.
  • Only one instance of the limiter process can run at a time. This could be achieved either by only starting a single instance of the limiter process, or by using locking mechanisms in the persistent storage.

It's fairly heavyweight, but you can always inspect the persistent storage if you need to see what's going on.

like image 141
Nathan Avatar answered Oct 12 '22 13:10

Nathan