Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to define a timeout for Apache Airflow DAGs?

Tags:

airflow

I'm using Airflow 1.10.2 but Airflow seems to ignore the timeout I've set for the DAG.

I'm setting a timeout period for the DAG using the dagrun_timeout parameter (e.g. 20 seconds) and I've got a task which takes 2 mins to run, but Airflow marks the DAG as successful!

args = {
    'owner': 'me',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True,
}

dag = DAG(
    'test_timeout',
     schedule_interval=None,
     default_args=args,
     dagrun_timeout=timedelta(seconds=20),
)

def this_passes(**kwargs):
    return

def this_passes_with_delay(**kwargs):
    time.sleep(120)
    return

would_succeed = PythonOperator(
    task_id='would_succeed',
    dag=dag,
    python_callable=this_passes,
    email=to,
)

would_succeed_with_delay = PythonOperator(
    task_id='would_succeed_with_delay',
    dag=dag,
    python_callable=this_passes_with_delay,
    email=to,
)

would_succeed >> would_succeed_with_delay

No error messages are thrown. Am I using an incorrect parameter?

like image 257
Babak Tourani Avatar asked Jul 19 '19 10:07

Babak Tourani


People also ask

What is DAG run timeout?

:param dagrun_timeout: specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns, and only once the # of active DagRuns == max_active_runs.

What is timeout in Airflow?

Timeouts. If you want a task to have a maximum runtime, set its execution_timeout attribute to a datetime. timedelta value that is the maximum permissible runtime. If it runs longer than this, Airflow will kick in and fail the task with a timeout exception.

How do you stop DAGs from running in Airflow?

Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).

How do you parameterize Airflow DAG?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.


1 Answers

As stated in the source code:

:param dagrun_timeout: specify how long a DagRun should be up before
    timing out / failing, so that new DagRuns can be created. The timeout
    is only enforced for scheduled DagRuns, and only once the
    # of active DagRuns == max_active_runs.

so this might be expected behavior as you set schedule_interval=None. Here, the idea is rather to make sure a scheduled DAG won't last forever and block subsequent run intances.

Now, you may be interested in the execution_timeout available in all operators. For example, you could set a 60s timeout on your PythonOperator like this:

would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
                            dag=dag,
                            execution_timeout=timedelta(seconds=60),
                            python_callable=this_passes_with_delay,
                            email=to)
like image 177
Lucas Avatar answered Sep 19 '22 15:09

Lucas