Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSH

Our Current Design

Env Redis 2.8.17

We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH

However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.

Producers: multiple threads(from multiple servers) using LPUSH to push the items.

Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.

BRPOPLPUSH q processing-q

As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.

Problem

Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.

So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.

So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.

Any suggestions or ideas ?

like image 796
aspdeepak Avatar asked Jan 16 '15 14:01

aspdeepak


3 Answers

You just need to include the job you want to delete in your call to LREM.

LREM takes the form:

LREM queue count "object"

It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.

LREM processing-q 1 "job_identifier"

For more see the documentation here: http://redis.io/commands/lrem

Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.

So the whole process looks like this:

Producer

  1. RPUSH q "job_identifier"

Consumer

  1. SETEX lock:processing-q:job_identifier 60 (Set lock first to avoid race condition)
  2. BRPOPLPUSH q processing-queue
  3. Process job
  4. LREM processing-queue "job_identifier"

Expired Jobs Monitor

  1. jobs = LRANGE processing-queue 0 -1
  2. foreach job in jobs : lock = GET lock:processing-q:job_identifier
  3. if lock is null this job timed out, so remove from processing-q LREM processing-queue "job_identifier"
  4. and retry with RPUSH q "job_identifier"

@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

like image 179
Loren_ Avatar answered Oct 02 '22 01:10

Loren_


The approach I would take is to use a per-consumer processing-q (e.g. processing-q:consumer-id). That would solve your current problem but you'd still need to handle crashed consumers somehow. For that, I suggest you also keep the last time that each consumer popped a task and periodically check for timeouts. If a consumer has reached the timeout, move its task back to the main queue and delete its queue.

like image 12
Itamar Haber Avatar answered Oct 02 '22 02:10

Itamar Haber


In a similar project, I'm using the hostname and the process id of the worker for the backup queues. Each worker has its own backup queue, and if the worker dies, the item is not lost.

Check the README and the implementation for more details.

like image 4
soveran Avatar answered Oct 02 '22 01:10

soveran