I would like to change the dag_concurrency
parameter of a specific Airflow DAG. It seems there is a global dag_concurrency
parameter in airflow.cfg
but is it possible to set different values for different DAGs?
I have tried to add a concurrency parameter in my DAG code in the SSHExecuteOperator
task but the concurrency value still shows the standard parameter (16) in the DAG details.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24)) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
concurrency=1,
dag=dag)
)
Here is the DAG details
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.
An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes.
I found the solution. I was not adding the concurrency parameter in the right place. It should be added as an attribute of the DAG
object directly and not in the task SSHExecuteOperator
. Here is the new code:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24),
concurrency=1) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
dag=dag)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With