Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prevent celery task from starting until a different task with similar arguments is done

Let's say I have a celery task which takes two arguments: X(a,b)

I need to implement custom concurrency logic with the following two rules:

  1. Instances of X can run concurrently if they have different values for a. That is, if X(a=1,b=10) is running when X(a=2,b=20) is added to the queue, then the latter is pulled from the queue and executed immediately.

  2. Instances of X can NOT run concurrently if they have the same values for a. That is, if X(a=1,b=10) is running when X(a=1,b=20) is added to the queue, then the latter must wait on the queue until the former is done.

Rule #1 comes out of the box with celery by setting worker_concurrency>1 (docs). Rule #2 is the tricky one.

Distributed task locking, as described in the docs and in this blog, is an approach which gets me close to what I need. There are even libraries out there that implement it for you (celery-singleton). However, looking back at Rule #2, this approach appears to prevent the second task from being queued until the first task completes. I need it to be queued, just not executed on a worker until the first task completes.

Is there anyway to implement this? This SO question asks a similar question but no answer so far.

like image 543
Johnny Metz Avatar asked Nov 06 '22 02:11

Johnny Metz


1 Answers

This appears to be a good case for using redis and bound celery tasks. You can also use redis as your celery broker, if you don't already do that, and as a caching layer if you need that. It's really a swiss army knife. Deploying redis is also quite straightforward. I highly encourage anyone to get more familiar with it. It is a great tool to have in one's toolbox.

I will change the example a bit, because I am always confused by single-character functions and variables.

# Think of this as X(a,b) from the question
@task
def add(num1, num2):
    return num1 + num2

Then we can upgrade add to look more like this:

# "bind" the task so we have access to all the Task base class functionality
# via "self".
# https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
@task(bind=True)
def add(self, num1, num2):
    if does_running_task_exist_with(num1):
        # requeue. Please visit the docs for "retry" mentioned above.
        # There are also max_retries and some other nice things.
        # Try again in 10s
        self.retry(countdown=10)
        return
    return num1 + num2

Our does_running_task_exist_with helper function would then use a redis Set. Like all Set implementations, they guarantee uniqueness and checking for existence of a member is fast.

# Using https://github.com/andymccurdy/redis-py
import redis

def does_running_task_exist_with(some_number):
    # Connect to redis.
    # Using database number 2. You might be using db 0 for celery brokerage,
    # and db 1 for celery result storage. Using a separate DB is just nice
    # for isolation. Redis has up to 16.
    # Connects to localhost by default.
    redis_conn = redis.StrictRedis(db=2)
    # we try adding this number to the Set of currently processing numbers
    # https://redis.io/commands/sadd
    # Return value: the number of elements that were added to the set, 
    # not including all the elements already present into the set.
    members_added = redis_conn.sadd("manager_task_args", str(some_number))
    # Or shortcut it as "return members_added == 0". This here is 
    # more expressive though
    if members_added == 0:
        return True
    return False

Alright. Now the tracking and decision making is in place. One important thing missing is: once an add task is done, we need to remove the num1 from the redis set. Let's adjust the function a bit.

import redis

@task(bind=True)
def add(self, num1, num2):
    if does_running_task_exist_with(num1):
        self.retry(countdown=10)
        return
    # Do actual work…
    result = num1 + num2
    # Cleanup
    redis_conn = redis.StrictRedis(db=2)
    redis_conn.srem("manager_task_args", str(num1))
    return result

But what if things go wrong? What if the addition fails? Then our num1 would never be removed from the Set and our queue just starts getting longer and longer. We don't want that. You can do two things here: either create a class-based task with an on_failure method, or wrap it in a try-except-finally. We will go the try-finally route, because it is easier to follow in this case:

import redis

@task(bind=True)
def add(self, num1, num2):
    if does_running_task_exist_with(num1):
        self.retry(countdown=10)
        return
    try:
        result = num1 + num2
    finally:
        redis_conn = redis.StrictRedis(db=2)
        redis_conn.srem("manager_task_args", str(num1))
    return result

That should do it. Note that you might also want to look into redis connection pooling if you will have tons of tasks.

like image 138
Luis Nell Avatar answered Nov 12 '22 17:11

Luis Nell