Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add n tasks to celery queue and wait for the results

I'd add several jobs to the celery queue and wait for the results. I have many ideas about how I would accomplish this using some type of shared storage (memcached, redis, database, etc.), but I think it was something Celery could handle automatically, but I can't find any resources online.

Code example

def do_tasks(b):
    for a in b:
        c.delay(a)

    return c.all_results_some_how()
like image 658
Prydie Avatar asked Nov 01 '14 06:11

Prydie


People also ask

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Is Celery a task queue?

Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time.

Can a celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".


3 Answers

For Celery >= 3.0, TaskSet is deprecated in favour of group.

from celery import group
from tasks import add

job = group([
             add.s(2, 2),
             add.s(4, 4),
             add.s(8, 8),
             add.s(16, 16),
             add.s(32, 32),
])

Start the group in the background:

result = job.apply_async()

Wait:

result.join()
like image 128
laffuste Avatar answered Oct 12 '22 23:10

laffuste


Task.delay returns AsyncResult. Use AsyncResult.get to get result of each task.

To do that you need to keep references to the tasks.

def do_tasks(b):
    tasks = []
    for a in b:
        tasks.append(c.delay(a))
    return [t.get() for t in tasks]

Or you can use ResultSet:

UPDATE: ResultSet is deprecated, please see @laffuste 's answer.

def do_tasks(b):
    rs = ResultSet([])
    for a in b:
        rs.add(c.delay(a))
    return rs.get()
like image 24
falsetru Avatar answered Oct 13 '22 00:10

falsetru


I have a hunch you are not really wanting the delay but the async feature of Celery.

I think you really want a TaskSet:

from celery.task.sets import TaskSet
from someapp.tasks import sometask

def do_tasks(b):
    job = TaskSet([sometask.subtask((a,)) for a in b])
    result = job.apply_async()
    # might want to handle result.successful() == False
    return result.join()
like image 40
Árni St. Sigurðsson Avatar answered Oct 13 '22 00:10

Árni St. Sigurðsson