I'm using Celery to handle task scheduling in a Django app I'm developing, I'm working with the Django database just for testing.
I just tried several things to handle the execution of a task only if it's not already scheduled or in progress like the proposed in this article, but nothing work so far.
Something like this :
task.py
@task()
def add(x, y):
return x + y
And then when you call it twice like in the following way:
import myapp.tasks.add
myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15)
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15)
It should be allowing one instance based in the countdown=15
. How I can accomplish that the second call never execute it if there is another running or waiting?
Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.
A task is just a Python function. You can think of scheduling a task as a time-delayed call to the function. For example, you might ask Celery to call your function task1 with arguments (1, 3, 3) after five minutes. Or you could have your function batchjob called every night at midnight.
apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )
Celery will stop retrying after 7 failed attempts and raise an exception.
One problem with the accepted answer is that it is slow. Checking if a task is already running involves making a call to the broker and then iterating through both the running and active tasks. If you want to queue up the task fast this won't work. Also the current solution has a small race condition, in that 2 processes could be checking if the task has been queued at the same (find out it isn't), which would then queue up 2 tasks.
A better solution would be to what I call debounced tasks. Basically you increment a counter each time you queue a task. When the task starts you decrement it. Use redis and then it's all atomic.
e.g.
Queue up the task:
conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
Then in the task, you have 2 options, do you want to execute the task 15 seconds after the first one was queued (throttle) or execute it 15 seconds after the last one was queued (debounce). That is, if we keep trying to run the same task do we extend the timer, or do we just wait 15 for the first one and ignore the other tasks that were queued.
Easy to support both, here is debounce where we wait until the tasks stops getting queued:
conn = get_redis()
counter = conn.decr(key)
if counter > 0:
# task is queued
return
# continue on to rest of task
Throttle version:
counter = conn.getset(key, '0')
if counter == '0':
# we already ran so ignore all the tasks that were queued since
return
# continue on to task
Another benefit of this solution over the accepted is that the key is entirely under your control. So if you want the same task to be executing but only once for different id/objects for example, you incorporate that into your key.
Update
Was thinking about this even more, you can do the throttle version even easier without having to queue up tasks.
Throttle v2 (when queuing up the task)
conn = get_redis()
counter = conn.incr(key)
if counter == 1:
# queue up the task only the first time
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
Then in the task you set the counter back to 0.
You don't even have to use a counter, if you had a set you could add the key to the set. If you get back 1, then the key wasn't in the set and you should queue the task. If you get back 0, then key is already in the set so don't queue the task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With