Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can celery,celerybeat and django-celery-beat dynamically add/remove tasks in runtime without restart celerybeat?

I tried everything I can found including:

stackoverflow

How to dynamically add / remove periodic tasks to Celery (celerybeat)

Can celery celerybeat dynamically add/remove tasks in runtime?

Github issue

How to dynamically add or remove tasks to celerybeat?

What I got from the above is if I only use celery and celery beat I have to restart the celery beat after I add/remove the tasks. But I don't have to restart it if I combine django-celery-beat.

I follow the docs step by step:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

My celeryconfig

BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"

My celery beat run command

celery -A tasks beat -l info -S django

This works well, The tasks run as expected.After that, I wrote a script to add tasks at the runtime

import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule


crontab = CrontabSchedule.objects.create(
       minute='*/1',
       hour='*',
       day_of_week='*',
   )

period = PeriodicTask.objects.create(
       name='testfasd',
       kwargs={},
       crontab=crontab,
       task='tasks.test',
   )

setup_periodic_tasks(app)

When I took a look at the database, I got what I expected, new record as well as the last_update field had updated.And the logs in celery beat also proof that

[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)

My question is although celery beat know the database had changed, but it still sending the old tasks and do not send the new task to the worker. Any idea?

Update

I using docker for my project, maybe it's related.

like image 948
Windsooon Avatar asked Dec 20 '16 10:12

Windsooon


People also ask

Can we use celery without Django?

Yes you can. Celery is a generic asynchronous task queue. In place of "django_project" you would point to your module.

How do you use celery beat in Django?

To use the Celery Beat, we need to configure the Redis server in the Django projects settings.py file. As we have installed the Redis server on the local machine, we will point the URL to localhost. The CELERY_TIMEZONE variable must be correctly set to run the tasks at the intended times.

How do you stop a periodic task from celery?

When enabled is False then the periodic task becomes idle. You can again make it active by making enable = True . If you no longer needs the task then you can simply delete the entry.


1 Answers

This github issue

[You can't add or delete tasks in celerybeat] currently, you have to restart beat.

No. In order to refresh tasks or task timing inside celery[beat], you must restart the celery[beat] instance. Tasks are loaded into memory at runtime. In order to change/add a task, you must refresh the instance.

You may consider using self recurring tasks, using custom timings and conditional execution. Example:

from datetime import timedelta
from celery import shared_task

@shared_task
def check_conditions():
    # Do some db-level code
    if condition:
        check_conditions.apply_async(eta=timedelta(hours=6))

I use this in production and it performs well.

If you need to reload tasks, just programatically restart celery[beat]:

@shared_task
def autoreload():
    if condition:
        execute_shell_code_to_restart_celery()

I have not used this and cannot vouch for it's usability, but theoretically should work.

This github issue

I have to reload beat to get this changes updated on worker ... using django-celery-beat ... This problem is still present on 4.0.2 and on master, tested [on December 21st, 2016].

like image 87
Kris Molinari Avatar answered Sep 17 '22 15:09

Kris Molinari