Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Redis reliable queues for multi threaded processing

For my ongoing project, I am using Redis for message distribution across several processes. Now, I am supposed to make them reliable.

I consider using the Reliable queue pattern through BRPOPLPUSH command. This pattern suggests that the processing thread remove the extra copy of message from "processing list" via lrem command, after the job has been successfully completed.

As I am using multiple threads to pop, the extra copies of popped item go into a processing list from several threads. That is to say, the processing queue contains elements popped by several threads. As a consequence, if a thread completes its job, it cannot know which item to remove from the "processing queue".

To overcome this problem, I am thinking that I should maintain multiple processing queues (one for each thread) based on threadId. So, my BRPOPLPUSH will be:

BRPOPLPUSH <primary-queue> <thread-specific-processing-queue>

Then for cleaning up timedout objects, my monitoring thread will have to monitor all these thread specific processing queues.

Are there any better approaches to this problem, than the one conceived above?

like image 498
Mopparthy Ravindranath Avatar asked Nov 01 '22 06:11

Mopparthy Ravindranath


1 Answers

@user779159

To support reliable queue mechanism, we take the following approach:

 - two data structures
    -- Redis List (the original queue from which items are popped regularly)
    -- a Redis z-set, which temporarily stores the popped item.

Algorithm:

-- When an element is popped, we store in z-set 
-- If the task that picked the item completed its job, it will delete the entry from z-set.
-- If the task couldn't complete it, the item will be hanging around in z-set. So we know, whether a task was done within expected time or not.
-- Now, another background process periodically scans this z-set, picks up items which are timedout, and then puts them back to queue

How it is done:

  • we use zset to store the item that we poped (typically using a lua script).
  • We store a timeout value as the rank/score of this item.
  • Another scanner process, will periodically (say every minute) run z-set command zrangebyscore, to select items between (now and last 1 minute).
  • If there are items found by the above command, this means the process that popped the item (via brpop) has not completed its task in time.
  • So, this 2nd process will put the item back to the queue (redis list) where it originally belonged.
like image 104
Mopparthy Ravindranath Avatar answered Nov 12 '22 00:11

Mopparthy Ravindranath