Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Allow a task execution if it's not already scheduled using celery

I'm using Celery to handle task scheduling in a Django app I'm developing, I'm working with the Django database just for testing.

I just tried several things to handle the execution of a task only if it's not already scheduled or in progress like the proposed in this article, but nothing work so far.

Something like this :

task.py

@task()
def add(x, y):
   return x + y

And then when you call it twice like in the following way:

import myapp.tasks.add

myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15)
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15)

It should be allowing one instance based in the countdown=15. How I can accomplish that the second call never execute it if there is another running or waiting?

like image 218
Victor Sigler Avatar asked Aug 31 '15 22:08

Victor Sigler


People also ask

How does Celery execute tasks?

Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.

How do you schedule celery tasks?

A task is just a Python function. You can think of scheduling a task as a time-delayed call to the function. For example, you might ask Celery to call your function task1 with arguments (1, 3, 3) after five minutes. Or you could have your function batchjob called every night at midnight.

What is Apply_async in Celery?

apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )

What happens when a Celery task fails?

Celery will stop retrying after 7 failed attempts and raise an exception.


1 Answers

One problem with the accepted answer is that it is slow. Checking if a task is already running involves making a call to the broker and then iterating through both the running and active tasks. If you want to queue up the task fast this won't work. Also the current solution has a small race condition, in that 2 processes could be checking if the task has been queued at the same (find out it isn't), which would then queue up 2 tasks.

A better solution would be to what I call debounced tasks. Basically you increment a counter each time you queue a task. When the task starts you decrement it. Use redis and then it's all atomic.

e.g.

Queue up the task:

conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

Then in the task, you have 2 options, do you want to execute the task 15 seconds after the first one was queued (throttle) or execute it 15 seconds after the last one was queued (debounce). That is, if we keep trying to run the same task do we extend the timer, or do we just wait 15 for the first one and ignore the other tasks that were queued.

Easy to support both, here is debounce where we wait until the tasks stops getting queued:

conn = get_redis()
counter = conn.decr(key)
if counter > 0:
    # task is queued
    return
# continue on to rest of task

Throttle version:

counter = conn.getset(key, '0')
if counter == '0':
    # we already ran so ignore all the tasks that were queued since
    return
# continue on to task

Another benefit of this solution over the accepted is that the key is entirely under your control. So if you want the same task to be executing but only once for different id/objects for example, you incorporate that into your key.

Update

Was thinking about this even more, you can do the throttle version even easier without having to queue up tasks.

Throttle v2 (when queuing up the task)

conn = get_redis()
counter = conn.incr(key)
if counter == 1:
    # queue up the task only the first time
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

Then in the task you set the counter back to 0.

You don't even have to use a counter, if you had a set you could add the key to the set. If you get back 1, then the key wasn't in the set and you should queue the task. If you get back 0, then key is already in the set so don't queue the task.

like image 106
dalore Avatar answered Oct 14 '22 09:10

dalore