We're using Airflow:1.10.0 and after some analysis why some of our ETL processes are taking so long we saw that the subdags are using a SequentialExecutor
instead to use BaseExecutor
or when we configure the CeleryExecutor
.
I would like to know if this is a bug or an expected behavior of Airflow. Doesn't make any sense have some capability to execute tasks in parallel but in some specific kind of task, this capability is lost.
CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow. cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.
Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.
By default, Airflow uses SequentialExecutor which would execute task sequentially no matter what. So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow.
To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.
It is a typical pattern to use the SequentialExecutor in subdags with the idea that you often are executing a lot of similar related tasks and don't necessarily want the added overhead of going through adding to queues in celery, etc. See the "other tips" section in the Airflow docs for subdags: https://airflow.apache.org/concepts.html#subdags
By default subdags are set to use the Sequential Executor (see: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38) but you can change that.
To use the celery executor, add in the following in your subdag creation:
from airflow.executors.celery_executor import CeleryExecutor
mysubdag = SubDagOperator(
executor=CeleryExecutor()
...
)
Maybe a little bit late but implementing LocalExecutor works for me.
from airflow.executors.local_executor import LocalExecutor
subdag = SubDagOperator(
task_id=task_id,
default_args=default_args,
executor= LocalExecutor(),
dag=dag
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With