I tried everything I can found including:
How to dynamically add / remove periodic tasks to Celery (celerybeat)
Can celery celerybeat dynamically add/remove tasks in runtime?
How to dynamically add or remove tasks to celerybeat?
What I got from the above is if I only use celery and celery beat I have to restart the celery beat after I add/remove the tasks. But I don't have to restart it if I combine django-celery-beat.
I follow the docs step by step:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
My celeryconfig
BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"
My celery beat run command
celery -A tasks beat -l info -S django
This works well, The tasks run as expected.After that, I wrote a script to add tasks at the runtime
import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule
crontab = CrontabSchedule.objects.create(
minute='*/1',
hour='*',
day_of_week='*',
)
period = PeriodicTask.objects.create(
name='testfasd',
kwargs={},
crontab=crontab,
task='tasks.test',
)
setup_periodic_tasks(app)
When I took a look at the database, I got what I expected, new record as well as the last_update field had updated.And the logs in celery beat also proof that
[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
My question is although celery beat know the database had changed, but it still sending the old tasks and do not send the new task to the worker. Any idea?
I using docker for my project, maybe it's related.
Yes you can. Celery is a generic asynchronous task queue. In place of "django_project" you would point to your module.
To use the Celery Beat, we need to configure the Redis server in the Django projects settings.py file. As we have installed the Redis server on the local machine, we will point the URL to localhost. The CELERY_TIMEZONE variable must be correctly set to run the tasks at the intended times.
When enabled is False then the periodic task becomes idle. You can again make it active by making enable = True . If you no longer needs the task then you can simply delete the entry.
This github issue
[You can't add or delete tasks in celerybeat] currently, you have to restart beat.
No. In order to refresh tasks or task timing inside celery[beat], you must restart the celery[beat] instance. Tasks are loaded into memory at runtime. In order to change/add a task, you must refresh the instance.
You may consider using self recurring tasks, using custom timings and conditional execution. Example:
from datetime import timedelta
from celery import shared_task
@shared_task
def check_conditions():
# Do some db-level code
if condition:
check_conditions.apply_async(eta=timedelta(hours=6))
I use this in production and it performs well.
If you need to reload tasks, just programatically restart celery[beat]:
@shared_task
def autoreload():
if condition:
execute_shell_code_to_restart_celery()
I have not used this and cannot vouch for it's usability, but theoretically should work.
This github issue
I have to reload beat to get this changes updated on worker ... using django-celery-beat ... This problem is still present on 4.0.2 and on master, tested [on December 21st, 2016].
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With