I have several tasks, something like this:
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'api.tasks.task1',
'schedule': timedelta(seconds=10),
},
'task2': {
'task': 'api.tasks.task2',
'schedule': timedelta(seconds=30),
},
'task3': {
'task': 'api.tasks.task3',
'schedule': timedelta(seconds=15),
},
...
}
So, task1 will be run in *:*:10, *:*:20, *:*:30, *:*:40, *:*:50 and *:*:00
task2 will be run in *:*:30 and *:*:00
task3 will be run in *:*:15, *:*:30, *:*:45 and *:*:00
Then the tasks concurs always in *:*:30 and *:*:00. Is there any way to add an offset. I want to get something like this:
task1 (offset=2) run in *:*:12, *:*:22, *:*:32, *:*:42, *:*:52 and *:*:02
task2 (offset=7) run in *:*:37 and *:*:07
task3 (offset=0) run in *:*:15, *:*:30, *:*:45 and *:*:00
I have read the documentation, and I think I must use crontab, but is not there another way more nice? And crontab has not for seconds configuration :-(
According to celery documentation:
You can also define your own custom schedule types, by extending the interface of schedule.
So here is my solution:
from datetime import timedelta
from celery import Celery
from celery.schedules import schedule
class MySchedule(schedule):
def __init__(self, run_every=None, offset=None):
self._run_every = run_every
self._offset = offset if offset is not None else timedelta(seconds=0)
self._do_offset = True if self._offset else False
super(MySchedule, self).__init__(
run_every=self._run_every + self._offset)
def is_due(self, last_run_at):
ret = super(MySchedule, self).is_due(last_run_at)
if self._do_offset and ret.is_due:
self._do_offset = False
**self._offset = datetime.timedelta(seconds=0)** #bug fix
self.run_every = self._run_every
ret = super(MySchedule, self).is_due(last_run_at)
return ret
def __reduce__(self):
return self.__class__, (self._run_every, self._offset)
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule = {
'task1': {
'task': 'tasks.task1',
'schedule': MySchedule(
run_every=timedelta(seconds=10), offset=timedelta(seconds=2)),
},
'task2': {
'task': 'tasks.task2',
'schedule': MySchedule(
run_every=timedelta(seconds=30), offset=timedelta(seconds=7)),
},
'task3': {
'task': 'tasks.task3',
'schedule': MySchedule(
run_every=timedelta(seconds=15), offset=timedelta(seconds=0)),
},
}
@app.task
def task1():
print('task1')
@app.task
def task2():
print('task2')
@app.task
def task3():
print('task3')
You can write your own MySchedule and extend it from BaseSchedule to have more control.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With