Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CeleryExecutor in Airflow are not parallelizing tasks in a subdag

We're using Airflow:1.10.0 and after some analysis why some of our ETL processes are taking so long we saw that the subdags are using a SequentialExecutor instead to use BaseExecutor or when we configure the CeleryExecutor.

I would like to know if this is a bug or an expected behavior of Airflow. Doesn't make any sense have some capability to execute tasks in parallel but in some specific kind of task, this capability is lost.

Execution of our SugDag (Zoom in Subdag)

like image 950
Flavio Avatar asked Aug 16 '18 21:08

Flavio


People also ask

What is CeleryExecutor in Airflow?

CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow. cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.

How many tasks can you execute in parallel in Airflow?

Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.

How do I run parallel tasks in Airflow?

By default, Airflow uses SequentialExecutor which would execute task sequentially no matter what. So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow.

How do you set up celery executor Airflow?

To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.


2 Answers

It is a typical pattern to use the SequentialExecutor in subdags with the idea that you often are executing a lot of similar related tasks and don't necessarily want the added overhead of going through adding to queues in celery, etc. See the "other tips" section in the Airflow docs for subdags: https://airflow.apache.org/concepts.html#subdags

By default subdags are set to use the Sequential Executor (see: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38) but you can change that.

To use the celery executor, add in the following in your subdag creation:

from airflow.executors.celery_executor import CeleryExecutor
mysubdag = SubDagOperator(
    executor=CeleryExecutor()
    ...
)
like image 58
Charlie Gelman Avatar answered Sep 29 '22 21:09

Charlie Gelman


Maybe a little bit late but implementing LocalExecutor works for me.

from airflow.executors.local_executor import LocalExecutor

subdag = SubDagOperator(
  task_id=task_id,
  default_args=default_args,
  executor= LocalExecutor(),
  dag=dag
)

like image 44
strider Avatar answered Sep 29 '22 23:09

strider