Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Debounce Celery tasks?

Tags:

python

celery

Is there a standard method for debouncing Celery tasks?

For example, so that a task can be "started" multiple times, but will only be run once after some delay:

def debounce_task(task):
    if task_is_queued(task):
        return
    task.apply_async(countdown=30)
like image 930
David Wolever Avatar asked Jan 26 '15 18:01

David Wolever


2 Answers

Here's how we do it with Redis counters. All of this can probably be generalized in a decorator but we only use it for a specific task (webhooks)

Your public-facing task is what you call from other functions. It'll need to increment a key in Redis. The key is formed by the arguments of your function, whatever they may be (this ensures the counter is unique amongst individual tasks)

@task
def your_public_task(*args, **kwargs):
    cache_key = make_public_task_cache_key(*args, **kwargs)
    get_redis().incr(cache_key)
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY)

Note the cache key functions are shared (you want the same cache key in each function), and the countdown setting.

Then, the actual task executing the code does the following:

@task
def _your_task(*args, **kwargs):
    cache_key = make_public_task_cache_key(*args, **kwargs)
    counter = get_redis().getset(cache_key, 0)
    # redis makes the zero a string.
    if counter == '0':
       return

    ... execute your actual task code.

This lets you hit your_public_task.delay(..) as many times as you want, within your QUEUE_DELAY, and it'll only fire off once.

like image 123
Bartek Avatar answered Sep 18 '22 04:09

Bartek


Here's how you can do it with Mongo.

NOTE: I had to make the design a little more forgiving, as Celery tasks aren't guaranteed to execute the exact moment of eta is met or countdown runs out.

Also, Mongo expiring indexes are only cleaned up every minute or so; So you can't base the design around records being deleted the moment the eta is up.

Anyhow, the flow is something like this:

  1. Client code calls my_task.
  2. preflight increments a call counter, and returns it as flight_id
  3. _my_task is set to be executed after TTL seconds.
  4. When _my_task runs, it checks if it's flight_id is still current. If it's not, it aborts.
  5. ... sometime later... mongo cleans up stale entries in the collection, via an expiring index.

@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)

@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
        return
    # ... actual work ... #

Library code:

TTL = 5 * 60     # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 

# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
                                  ('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)


def preflight(collection, fn, key, ttl):
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
    result = collection.find_one_and_update({
        'fn': fn,
        'key': key,
    }, {
        '$set': {
            'eta': eta
        },
        '$inc': {
            'flightId': 1
        }
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
    return result['flightId']


def check_for_takeoff(collection, fn, key, flight_id):
    result = collection.find_one({
        'fn': fn,
        'key': key
    })
    ready = result is None or result['flightId'] == flight_id
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
    return ready
like image 34
nathan-m Avatar answered Sep 21 '22 04:09

nathan-m