I use airflow v1.7.1.3
I have two DAG, dag_a and dag_b. I set up 10 dag_a tasks at one time, which theoretically should be execution one by one. In reality, the 10 dag_a tasks are executed in parallel. The concurrency parameter doesn't work. Can anyone tell me why?
Here's the pseudocode:
in dag_a.py
dag = DAG('dag_a',
start_date=datetime.now(),
default_args=default_args,
schedule_interval=None,
concurrency=1,
max_active_runs=1)
in dag_b.py
from fabric.api import local
dag = DAG('dag_b',
start_date=datetime.now(),
default_args=default_args,
schedule_interval='0 22 */1 * *',
concurrency=1,
max_active_runs=1)
def trigger_dag_a(**context):
dag_list = []
for rec in rang(1,10):
time.sleep(2)
cmd = "airflow trigger_dag dag_a"
log.info("cmd:%s"%cmd)
msg = local(cmd) #"local" is function in fabric
log.info(msg)
trigger_dag_a_proc = PythonOperator(python_callable=trigger_dag_a,
provide_context=True,
task_id='trigger_dag_a_proc',
dag=dag)
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow. cfg and then run airflow initdb .
Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.
Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).
You can limit your task instances by specifying a pool.
2.Then setup your dags to use this pool:
default_args = {
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime(2017, 12, 16),
'pool': 'my_pool'
}
dag = DAG(
dag_id='foo',
schedule_interval='@daily',
default_args=default_args,
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With