Let's say I have a celery task which takes two arguments: X(a,b)
I need to implement custom concurrency logic with the following two rules:
Instances of X
can run concurrently if they have different values for a
. That is, if X(a=1,b=10)
is running when X(a=2,b=20)
is added to the queue, then the latter is pulled from the queue and executed immediately.
Instances of X
can NOT run concurrently if they have the same values for a
. That is, if X(a=1,b=10)
is running when X(a=1,b=20)
is added to the queue, then the latter must wait on the queue until the former is done.
Rule #1 comes out of the box with celery by setting worker_concurrency>1
(docs). Rule #2 is the tricky one.
Distributed task locking, as described in the docs and in this blog, is an approach which gets me close to what I need. There are even libraries out there that implement it for you (celery-singleton). However, looking back at Rule #2, this approach appears to prevent the second task from being queued until the first task completes. I need it to be queued, just not executed on a worker until the first task completes.
Is there anyway to implement this? This SO question asks a similar question but no answer so far.
This appears to be a good case for using redis and bound celery tasks. You can also use redis as your celery broker, if you don't already do that, and as a caching layer if you need that. It's really a swiss army knife. Deploying redis is also quite straightforward. I highly encourage anyone to get more familiar with it. It is a great tool to have in one's toolbox.
I will change the example a bit, because I am always confused by single-character functions and variables.
# Think of this as X(a,b) from the question
@task
def add(num1, num2):
return num1 + num2
Then we can upgrade add
to look more like this:
# "bind" the task so we have access to all the Task base class functionality
# via "self".
# https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
# requeue. Please visit the docs for "retry" mentioned above.
# There are also max_retries and some other nice things.
# Try again in 10s
self.retry(countdown=10)
return
return num1 + num2
Our does_running_task_exist_with
helper function would then use a redis Set. Like all Set implementations, they guarantee uniqueness and checking for existence of a member is fast.
# Using https://github.com/andymccurdy/redis-py
import redis
def does_running_task_exist_with(some_number):
# Connect to redis.
# Using database number 2. You might be using db 0 for celery brokerage,
# and db 1 for celery result storage. Using a separate DB is just nice
# for isolation. Redis has up to 16.
# Connects to localhost by default.
redis_conn = redis.StrictRedis(db=2)
# we try adding this number to the Set of currently processing numbers
# https://redis.io/commands/sadd
# Return value: the number of elements that were added to the set,
# not including all the elements already present into the set.
members_added = redis_conn.sadd("manager_task_args", str(some_number))
# Or shortcut it as "return members_added == 0". This here is
# more expressive though
if members_added == 0:
return True
return False
Alright. Now the tracking and decision making is in place. One important thing missing is: once an add
task is done, we need to remove the num1
from the redis set. Let's adjust the function a bit.
import redis
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
# Do actual work…
result = num1 + num2
# Cleanup
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args", str(num1))
return result
But what if things go wrong? What if the addition fails? Then our num1
would never be removed from the Set and our queue just starts getting longer and longer. We don't want that. You can do two things here: either create a class-based task with an on_failure
method, or wrap it in a try-except-finally. We will go the try-finally route, because it is easier to follow in this case:
import redis
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
try:
result = num1 + num2
finally:
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args", str(num1))
return result
That should do it. Note that you might also want to look into redis connection pooling if you will have tons of tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With