Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery - Schedule periodic task at the end of another task

Tags:

python

celery

I want to schedule a periodic task with Celery dynamically at the end of another group of task.

I know how to create (static) periodic tasks with Celery:

CELERYBEAT_SCHEDULE = {
      'poll_actions': {
          'task': 'tasks.poll_actions',
          'schedule': timedelta(seconds=5)
      }
}

But I want to create periodic jobs dynamically from my tasks (and maybe have a way to stop those periodic jobs when some condition is achieved (all tasks done).

Something like:

@celery.task
def run(ids):
    group(prepare.s(id) for id in ids) | execute.s(ids) | poll.s(ids, schedule=timedelta(seconds=5))

@celery.task
def prepare(id):
    ...

@celery.task
def execute(id):
    ...

@celery.task
def poll(ids):
    # This task has to be schedulable on demand
    ...
like image 686
JahMyst Avatar asked Dec 12 '16 19:12

JahMyst


1 Answers

The straightforward solution to this requires that you be able to add/remove beat scheduler entries on the fly. As of the answering of this question...

How to dynamically add / remove periodic tasks to Celery (celerybeat)

This was not possible. I doubt it has become available in the interim because ...

You are conflating two concepts here. The notion of "Event Driven Work" and the idea of "Batch Schedule Driven Work"(which is really just the first case where the event happens on a schedule). If you really consider what you are doing here you'll find that there is a rather complex set of edge cases. Messages are distributed in nature what happens when groups spawned from different messages start creating conflicting entries? What do you do when you find yourself under a mountain of previously scheduled kruft?

When working with messaging systems you are really looking to build recursive trees. Spindles of work that do something and spawn more messages to do more things. Cycles(intended or otherwise) aside these ultimately achieve their base cases and terminate.

The answer to whatever you are actually trying to achieve lies with re-encoding your problem within the limitations of your messaging system and asynchronous work framework.

like image 172
nsfyn55 Avatar answered Oct 24 '22 20:10

nsfyn55