Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How control DAG concurrency in airflow

Tags:

python

airflow

I use airflow v1.7.1.3

I have two DAG, dag_a and dag_b. I set up 10 dag_a tasks at one time, which theoretically should be execution one by one. In reality, the 10 dag_a tasks are executed in parallel. The concurrency parameter doesn't work. Can anyone tell me why?

Here's the pseudocode:

in dag_a.py

dag = DAG('dag_a',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval=None,
            concurrency=1,
            max_active_runs=1)

in dag_b.py

from fabric.api import local

dag = DAG('dag_b',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval='0 22 */1 * *',
            concurrency=1,
            max_active_runs=1)


def trigger_dag_a(**context):

    dag_list = []
    for rec in rang(1,10):
        time.sleep(2)
        cmd = "airflow trigger_dag dag_a"

        log.info("cmd:%s"%cmd)
        msg = local(cmd)    #"local" is function in fabric
        log.info(msg)


trigger_dag_a_proc = PythonOperator(python_callable=trigger_dag_a,
                          provide_context=True,
                          task_id='trigger_dag_a_proc',
                          dag=dag)
like image 677
commissar wu Avatar asked Aug 18 '17 09:08

commissar wu


People also ask

What is DAG concurrency in Airflow?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

How do you run Airflow DAGs in parallel?

So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow. cfg and then run airflow initdb .

How many tasks can you execute in parallel in Airflow?

Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.

How do I stop my DAG from Airflowing?

Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).


1 Answers

You can limit your task instances by specifying a pool.

  1. Create a pool in the UI:

Pool

2.Then setup your dags to use this pool:

        default_args = {
                    'email_on_failure': False,
                    'email_on_retry': False,
                    'start_date': datetime(2017, 12, 16),
                    'pool': 'my_pool'
                }

        dag = DAG(
            dag_id='foo',
            schedule_interval='@daily',
            default_args=default_args,
        )
like image 97
x97Core Avatar answered Nov 15 '22 07:11

x97Core