Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scalable delayed task execution with Redis

I need to design a Redis-driven scalable task scheduling system.

Requirements:

  • Multiple worker processes.
  • Many tasks, but long periods of idleness are possible.
  • Reasonable timing precision.
  • Minimal resource waste when idle.
  • Should use synchronous Redis API.
  • Should work for Redis 2.4 (i.e. no features from upcoming 2.6).
  • Should not use other means of RPC than Redis.

Pseudo-API: schedule_task(timestamp, task_data). Timestamp is in integer seconds.

Basic idea:

  • Listen for upcoming tasks on list.
  • Put tasks to buckets per timestamp.
  • Sleep until the closest timestamp.
  • If a new task appears with timestamp less than closest one, wake up.
  • Process all upcoming tasks with timestamp ≤ now, in batches (assuming that task execution is fast).
  • Make sure that concurrent worker wouldn't process same tasks. At the same time, make sure that no tasks are lost if we crash while processing them.

So far I can't figure out how to fit this in Redis primitives...

Any clues?

Note that there is a similar old question: Delayed execution / scheduling with Redis? In this new question I introduce more details (most importantly, many workers). So far I was not able to figure out how to apply old answers here — thus, a new question.

like image 315
Alexander Gladysh Avatar asked Jun 03 '12 07:06

Alexander Gladysh


People also ask

Does Redis support priority queue?

It is possible to implement a priority queue in Redis using lists and the LPUSH/RPUSH/LPOP/RPOP operations. However, this requires extra work and can be tricky for developers who are new to the Redis project.

Can you use Redis as a queue?

Using Redis with Redis Queue allows you to enter those complex tasks into a queue, so the Redis worker executes these tasks outside of your application's HTTP server. In this article, we will build an app that enqueues jobs with Redis queue, performs a function on those jobs and returns the result of the function.

Is Redis queue reliable?

A queue is reliable if it can recover from a failure scenario. If a consumer crashes and the item it was processing is lost, the system is unreliable. A command was added to a previous version of Redis that is tailor-made for this exact situation.

What is a task queue?

Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services. The Task Queue service is designed for asynchronous work.


2 Answers

Here's another solution that builds on a couple of others [1]. It uses the redis WATCH command to remove the race condition without using lua in redis 2.6.

The basic scheme is:

  • Use a redis zset for scheduled tasks and redis queues for ready to run tasks.
  • Have a dispatcher poll the zset and move tasks that are ready to run into the redis queues. You may want more than 1 dispatcher for redundancy but you probably don't need or want many.
  • Have as many workers as you want which do blocking pops on the redis queues.

I haven't tested it :-)

The foo job creator would do:

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

The dispatcher(s) would look like:

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()

The foo worker would look like:

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)

[1] This solution is based on not_a_golfer's, one at http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/, and the redis docs for transactions.

like image 91
Dan Benamy Avatar answered Oct 22 '22 17:10

Dan Benamy


You didn't specify the language you're using. You have at least 3 alternatives of doing this without writing a single line of code in Python at least.

  1. Celery has an optional redis broker. http://celeryproject.org/

  2. resque is an extremely popular redis task queue using redis. https://github.com/defunkt/resque

  3. RQ is a simple and small redis based queue that aims to "take the good stuff from celery and resque" and be much simpler to work with. http://python-rq.org/

You can at least look at their design if you can't use them.

But to answer your question - what you want can be done with redis. I've actually written more or less that in the past.

EDIT: As for modeling what you want on redis, this is what I would do:

  1. queuing a task with a timestamp will be done directly by the client - you put the task in a sorted set with the timestamp as the score and the task as the value (see ZADD).

  2. A central dispatcher wakes every N seconds, checks out the first timestamps on this set, and if there are tasks ready for execution, it pushes the task to a "to be executed NOW" list. This can be done with ZREVRANGEBYSCORE on the "waiting" sorted set, getting all items with timestamp<=now, so you get all the ready items at once. pushing is done by RPUSH.

  3. workers use BLPOP on the "to be executed NOW" list, wake when there is something to work on, and do their thing. This is safe since redis is single threaded, and no 2 workers will ever take the same task.

  4. once finished, the workers put the result back in a response queue, which is checked by the dispatcher or another thread. You can add a "pending" bucket to avoid failures or something like that.

so the code will look something like this (this is just pseudo code):

client:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>

dispatcher:

while working:
   tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
   for task in tasks:

       #do the delete and queue as a transaction
       MULTI
       RPUSH "to_be_executed" task
       ZREM "new_tasks" task
       EXEC

   sleep(1)

I didn't add the response queue handling, but it's more or less like the worker:

worker:

while working:
   task = BLPOP "to_be_executed" <TIMEOUT>
   if task:
      response = work_on_task(task)
      RPUSH "results" response

EDit: stateless atomic dispatcher :

while working:

   MULTI
   ZREVRANGE "new_tasks" 0 1
   ZREMRANGEBYRANK "new_tasks" 0 1
   task = EXEC

   #this is the only risky place - you can solve it by using Lua internall in 2.6
   SADD "tmp" task

   if task.timestamp <= now:
       MULTI
       RPUSH "to_be_executed" task
       SREM "tmp" task
       EXEC
   else:

       MULTI
       ZADD "new_tasks" task.timestamp task
       SREM "tmp" task
       EXEC

   sleep(RESOLUTION)
like image 7
Not_a_Golfer Avatar answered Oct 22 '22 16:10

Not_a_Golfer