I have a scheduled celery running tasks every 30 seconds. I have one that runs as task daily, and another one that runs weekly on a user specified time and day of the week. It checks for the "start time" and the "next scheduled date". The next scheduled date does not update until the task is completed.
However, I want to know how to make sure that the celery beat is only running the task once. I see that right now, celery will run a certain task multiple times until that task's next scheduled date has been updated.
In order to do that you need to implement some kind of "distributed lock", and easy an reliable approach to this issue is to use the django cache with memcached backend and set a "flag" in it when the task starts then just before it finish remove that flag. Other option is to use "redis" lock as "distributed lock". Example of using django cache memcached as backend:
@shared_task
def my_task(arg1, arg2, lock_expire=300):
lock_key = 'my_task'
acquire_lock = lambda: cache.add(lock_key, '1', lock_expire)
release_lock = lambda: cache.delete(lock_key)
if acquire_lock():
try:
# Execute your code here!
except Exception:
# error handling here
finally:
# release allow other task to execute
release_lock()
else:
print("Other task is running, skipping")
The code above implements a "distributed lock" to ensure that only one task could run, no matter how many times you try to execute it again. The lock can be acquired by only one task :), the other will just skip the "main block" and finish. Does it make sense to you?
Have fun!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With