Is this the best way to use Celery for a pipeline: TaskA -> TaskB -> TaskC? I know nothing about Celery and the documentation really don't explain too much how things should be done in real application.
@app.task
def taskA(t_id):
if success:
TaskB.delay(t_id)
else:
ReportError.delay(t_id)
@app.task
def taskAA(t_id):
if success:
TaskB.delay(t_id)
else:
ReportError.delay(t_id)
@app.task
def taskB(t_id):
if success:
TaskC.delay(t_id)
else:
ReportError.delay(t_id)
Maybe I shouldn't use Celery for this kind of Task...
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operations but supports scheduling as well. The execution units, called tasks, are executed concurrently on one or more worker servers using multiprocessing, Eventlet, or gevent.
Celery is an open-source Python library which is used to run the tasks asynchronously. It is a task queue that holds the tasks and distributes them to the workers in a proper manner. It is primarily focused on real-time operation but also supports scheduling (run regular interval tasks).
Multiprocess programming is achieved by using celery workers (subprocesses). Each of them will execute the task (a function, series of jobs …) you have given and send the result back to the creator.
It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue. That way, your web app can continue to respond quickly to users while Celery completes expensive operations asynchronously in the background.
As the other commenters state, you can use a chain. See https://celery.readthedocs.org/en/latest/userguide/canvas.html#chains for more info. Based on your original question, like so:
from celery import chain
ret = chain(taskA.s(), taskB.s(), taskC.s()).apply_async()
if ret.status == u'SUCCESS':
print "result:", ret.get()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With