I need to design a Redis-driven scalable task scheduling system.
Requirements:
Pseudo-API: schedule_task(timestamp, task_data)
. Timestamp is in integer seconds.
Basic idea:
So far I can't figure out how to fit this in Redis primitives...
Any clues?
Note that there is a similar old question: Delayed execution / scheduling with Redis? In this new question I introduce more details (most importantly, many workers). So far I was not able to figure out how to apply old answers here — thus, a new question.
It is possible to implement a priority queue in Redis using lists and the LPUSH/RPUSH/LPOP/RPOP operations. However, this requires extra work and can be tricky for developers who are new to the Redis project.
Using Redis with Redis Queue allows you to enter those complex tasks into a queue, so the Redis worker executes these tasks outside of your application's HTTP server. In this article, we will build an app that enqueues jobs with Redis queue, performs a function on those jobs and returns the result of the function.
A queue is reliable if it can recover from a failure scenario. If a consumer crashes and the item it was processing is lost, the system is unreliable. A command was added to a previous version of Redis that is tailor-made for this exact situation.
Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services. The Task Queue service is designed for asynchronous work.
Here's another solution that builds on a couple of others [1]. It uses the redis WATCH command to remove the race condition without using lua in redis 2.6.
The basic scheme is:
I haven't tested it :-)
The foo job creator would do:
def schedule_task(queue, data, delay_secs):
# This calculation for run_at isn't great- it won't deal well with daylight
# savings changes, leap seconds, and other time anomalies. Improvements
# welcome :-)
run_at = time.time() + delay_secs
# If you're using redis-py's Redis class and not StrictRedis, swap run_at &
# the dict.
redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})
schedule_task('foo_queue', foo_data, 60)
The dispatcher(s) would look like:
while working:
redis.watch(SCHEDULED_ZSET_KEY)
min_score = 0
max_score = time.time()
results = redis.zrangebyscore(
SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
if results is None or len(results) == 0:
redis.unwatch()
sleep(1)
else: # len(results) == 1
redis.multi()
redis.rpush(results[0]['queue'], results[0]['data'])
redis.zrem(SCHEDULED_ZSET_KEY, results[0])
redis.exec()
The foo worker would look like:
while working:
task_data = redis.blpop('foo_queue', POP_TIMEOUT)
if task_data:
foo(task_data)
[1] This solution is based on not_a_golfer's, one at http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/, and the redis docs for transactions.
You didn't specify the language you're using. You have at least 3 alternatives of doing this without writing a single line of code in Python at least.
Celery has an optional redis broker. http://celeryproject.org/
resque is an extremely popular redis task queue using redis. https://github.com/defunkt/resque
RQ is a simple and small redis based queue that aims to "take the good stuff from celery and resque" and be much simpler to work with. http://python-rq.org/
You can at least look at their design if you can't use them.
But to answer your question - what you want can be done with redis. I've actually written more or less that in the past.
EDIT: As for modeling what you want on redis, this is what I would do:
queuing a task with a timestamp will be done directly by the client - you put the task in a sorted set with the timestamp as the score and the task as the value (see ZADD).
A central dispatcher wakes every N seconds, checks out the first timestamps on this set, and if there are tasks ready for execution, it pushes the task to a "to be executed NOW" list. This can be done with ZREVRANGEBYSCORE on the "waiting" sorted set, getting all items with timestamp<=now, so you get all the ready items at once. pushing is done by RPUSH.
workers use BLPOP on the "to be executed NOW" list, wake when there is something to work on, and do their thing. This is safe since redis is single threaded, and no 2 workers will ever take the same task.
once finished, the workers put the result back in a response queue, which is checked by the dispatcher or another thread. You can add a "pending" bucket to avoid failures or something like that.
so the code will look something like this (this is just pseudo code):
client:
ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>
dispatcher:
while working:
tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
for task in tasks:
#do the delete and queue as a transaction
MULTI
RPUSH "to_be_executed" task
ZREM "new_tasks" task
EXEC
sleep(1)
I didn't add the response queue handling, but it's more or less like the worker:
worker:
while working:
task = BLPOP "to_be_executed" <TIMEOUT>
if task:
response = work_on_task(task)
RPUSH "results" response
EDit: stateless atomic dispatcher :
while working:
MULTI
ZREVRANGE "new_tasks" 0 1
ZREMRANGEBYRANK "new_tasks" 0 1
task = EXEC
#this is the only risky place - you can solve it by using Lua internall in 2.6
SADD "tmp" task
if task.timestamp <= now:
MULTI
RPUSH "to_be_executed" task
SREM "tmp" task
EXEC
else:
MULTI
ZADD "new_tasks" task.timestamp task
SREM "tmp" task
EXEC
sleep(RESOLUTION)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With