Is there a standard method for debouncing Celery tasks?
For example, so that a task can be "started" multiple times, but will only be run once after some delay:
def debounce_task(task):
if task_is_queued(task):
return
task.apply_async(countdown=30)
Here's how we do it with Redis counters. All of this can probably be generalized in a decorator but we only use it for a specific task (webhooks)
Your public-facing task is what you call from other functions. It'll need to increment a key in Redis. The key is formed by the arguments of your function, whatever they may be (this ensures the counter is unique amongst individual tasks)
@task
def your_public_task(*args, **kwargs):
cache_key = make_public_task_cache_key(*args, **kwargs)
get_redis().incr(cache_key)
_your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY)
Note the cache key functions are shared (you want the same cache key in each function), and the countdown
setting.
Then, the actual task executing the code does the following:
@task
def _your_task(*args, **kwargs):
cache_key = make_public_task_cache_key(*args, **kwargs)
counter = get_redis().getset(cache_key, 0)
# redis makes the zero a string.
if counter == '0':
return
... execute your actual task code.
This lets you hit your_public_task.delay(..)
as many times as you want, within your QUEUE_DELAY
, and it'll only fire off once.
Here's how you can do it with Mongo.
NOTE: I had to make the design a little more forgiving, as Celery tasks aren't guaranteed to execute the exact moment of eta
is met or countdown
runs out.
Also, Mongo expiring indexes are only cleaned up every minute or so; So you can't base the design around records being deleted the moment the eta
is up.
Anyhow, the flow is something like this:
my_task
.preflight
increments a call counter, and returns it as flight_id
_my_task
is set to be executed after TTL
seconds._my_task
runs, it checks if it's flight_id
is still current. If it's not, it aborts.@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
_my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)
@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
return
# ... actual work ... #
Library code:
TTL = 5 * 60 # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL.
# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)
def preflight(collection, fn, key, ttl):
eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
result = collection.find_one_and_update({
'fn': fn,
'key': key,
}, {
'$set': {
'eta': eta
},
'$inc': {
'flightId': 1
}
}, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
return result['flightId']
def check_for_takeoff(collection, fn, key, flight_id):
result = collection.find_one({
'fn': fn,
'key': key
})
ready = result is None or result['flightId'] == flight_id
print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
return ready
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With