Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Possible to set different executor for each Airflow DAG?

Tags:

airflow

I am looking to add another DAG to an existing Airflow server. The server is currently using LocalExecutor but I might want my DAG to use CeleryExecutor. It seems like the configuration file airflow.cfg only allows one executor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor

Is it possible to configure Airflow such that the existing DAGs can continue to use LocalExecutor and my new DAG can use CeleryExecutor or a custom executor class? I haven't found any examples of people doing this nor come across anything in the Airflow documentation.

like image 882
Desiree Cox Avatar asked Aug 03 '16 17:08

Desiree Cox


People also ask

What is sequential executor in Airflow?

The SequentialExecutor is the default executor when you first install airflow . It is the only executor that can be used with sqlite since sqlite doesn't support multiple connections. This executor will only run one task instance at a time. For production use case, please use other executors.

How many tasks can an Airflow worker handle?

You can also tune your worker_concurrency (environment variable: AIRFLOW__CELERY__WORKER_CONCURRENCY ), which determines how many tasks each Celery worker can run at any given time. By default, the Celery executor runs a maximum of sixteen tasks concurrently.

What is the purpose of an executor in Airflow?

The Executor acts as a middle man to handle resource utilization and how to distribute work best. Although an Airflow job is organized at the DAG level, the execution phase of a job is more granular, and the Executor runs at the task level.

What is Celery executor in Airflow?

CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow. cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.


2 Answers

If you have a SubDAG within your DAG, you can pass in a specific executor to that SubDagOperator. For instance, to use a SequentialExecutor:

bar_subdag = SubDagOperator(
    task_id='bar',
    subdag=my_subdag('foo', 'bar', default_args),
    default_args=default_args,
    dag=foo_dag,
    executor=SequentialExecutor()
)

This is on 1.8, not sure if 1.9 is different.

like image 117
mithos Avatar answered Oct 23 '22 10:10

mithos


Seems the scheduler will only start one instance of the executor.

like image 24
Scott Ding Avatar answered Oct 23 '22 10:10

Scott Ding