Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery task that runs more tasks

I am using celerybeat to kick off a primary task that kicks of a number of secondary tasks. I have both tasks written already.

Is there a way to easily do this? Does Celery allow for tasks to be run from within tasks?

My example:

@task def compute(users=None):     if users is None:         users = User.objects.all()      tasks = []     for user in users:         tasks.append(compute_for_user.subtask((user.id,)))      job = TaskSet(tasks)     job.apply_async() # raises a IOError: Socket closed  @task def compute_for_user(user_id):     #do some stuff 

compute gets called from celerybeat, but causes an IOError when it tries to run apply_async. Any ideas?

like image 932
Mantas Vidutis Avatar asked Jun 14 '11 20:06

Mantas Vidutis


People also ask

Can celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".

What is Celerybeat schedule?

celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.


2 Answers

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks". See the documentation for Sets of tasks, Subtasks and Callbacks, which @Paperino was kind enough to link to.

For version 3.0, Celery changed to using groups for this, and other, types of behavior.

Your code shows that you are already familiar with this interface. Your actual question seems to be, "Why am I getting a 'Socket Closed' IOError when I try to run my set of subtasks?" I don't think anyone can answer that, because you have not provided enough information about your program. Your excerpt cannot be run as-is, so we cannot examine the problem you're having for ourselves. Please post the stacktrace provided with the IOError, and with any luck, someone that can help you with your crasher will come along.

like image 125
Jeremy W. Sherman Avatar answered Sep 21 '22 08:09

Jeremy W. Sherman


You can use something like this (Support in 3.0 )

g = group(compute_for_user.s(user.id) for user in users) g.apply_async() 
like image 38
Abhilash Joseph Avatar answered Sep 21 '22 08:09

Abhilash Joseph