Env Redis 2.8.17
We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH
However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.
Producers: multiple threads(from multiple servers) using LPUSH to push the items.
Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.
BRPOPLPUSH q processing-q
As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.
Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.
So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.
So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.
Any suggestions or ideas ?
You just need to include the job you want to delete in your call to LREM.
LREM takes the form:
LREM queue count "object"
It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.
LREM processing-q 1 "job_identifier"
For more see the documentation here: http://redis.io/commands/lrem
Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.
So the whole process looks like this:
Producer
RPUSH q "job_identifier"
Consumer
SETEX lock:processing-q:job_identifier 60
(Set lock first to avoid race condition)BRPOPLPUSH q processing-queue
LREM processing-queue "job_identifier"
Expired Jobs Monitor
LRANGE processing-queue 0 -1
GET lock:processing-q:job_identifier
LREM processing-queue "job_identifier"
RPUSH q "job_identifier"
@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq
The approach I would take is to use a per-consumer processing-q (e.g. processing-q:consumer-id). That would solve your current problem but you'd still need to handle crashed consumers somehow. For that, I suggest you also keep the last time that each consumer popped a task and periodically check for timeouts. If a consumer has reached the timeout, move its task back to the main queue and delete its queue.
In a similar project, I'm using the hostname and the process id of the worker for the backup queues. Each worker has its own backup queue, and if the worker dies, the item is not lost.
Check the README and the implementation for more details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With