Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery tasks same schedule with offset

I have several tasks, something like this:

CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'api.tasks.task1',
        'schedule': timedelta(seconds=10),
    },
    'task2': {
        'task': 'api.tasks.task2',
        'schedule': timedelta(seconds=30),
    },
    'task3': {
        'task': 'api.tasks.task3',
        'schedule': timedelta(seconds=15),
    },
    ...
}

So, task1 will be run in *:*:10, *:*:20, *:*:30, *:*:40, *:*:50 and *:*:00

task2 will be run in *:*:30 and *:*:00

task3 will be run in *:*:15, *:*:30, *:*:45 and *:*:00

Then the tasks concurs always in *:*:30 and *:*:00. Is there any way to add an offset. I want to get something like this:

task1 (offset=2) run in *:*:12, *:*:22, *:*:32, *:*:42, *:*:52 and *:*:02

task2 (offset=7) run in *:*:37 and *:*:07

task3 (offset=0) run in *:*:15, *:*:30, *:*:45 and *:*:00

I have read the documentation, and I think I must use crontab, but is not there another way more nice? And crontab has not for seconds configuration :-(

like image 418
Goin Avatar asked Apr 10 '26 16:04

Goin


1 Answers

According to celery documentation:

You can also define your own custom schedule types, by extending the interface of schedule.

So here is my solution:

from datetime import timedelta

from celery import Celery
from celery.schedules import schedule


class MySchedule(schedule):
    def __init__(self, run_every=None, offset=None):
        self._run_every = run_every
        self._offset = offset if offset is not None else timedelta(seconds=0)
        self._do_offset = True if self._offset else False
        super(MySchedule, self).__init__(
            run_every=self._run_every + self._offset)

    def is_due(self, last_run_at):
        ret = super(MySchedule, self).is_due(last_run_at)
        if self._do_offset and ret.is_due:
            self._do_offset = False
            **self._offset = datetime.timedelta(seconds=0)** #bug fix
            self.run_every = self._run_every
            ret = super(MySchedule, self).is_due(last_run_at)
        return ret

    def __reduce__(self):
        return self.__class__, (self._run_every, self._offset)


app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {
    'task1': {
        'task': 'tasks.task1',
        'schedule': MySchedule(
            run_every=timedelta(seconds=10), offset=timedelta(seconds=2)),
    },
    'task2': {
        'task': 'tasks.task2',
        'schedule': MySchedule(
            run_every=timedelta(seconds=30), offset=timedelta(seconds=7)),
    },
    'task3': {
        'task': 'tasks.task3',
        'schedule': MySchedule(
            run_every=timedelta(seconds=15), offset=timedelta(seconds=0)),
    },
}


@app.task
def task1():
    print('task1')


@app.task
def task2():
    print('task2')


@app.task
def task3():
    print('task3')

You can write your own MySchedule and extend it from BaseSchedule to have more control.

like image 104
vasi1y Avatar answered Apr 12 '26 09:04

vasi1y



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!