I'm using Airflow 1.10.2 but Airflow seems to ignore the timeout I've set for the DAG.
I'm setting a timeout period for the DAG using the dagrun_timeout
parameter (e.g. 20 seconds) and I've got a task which takes 2 mins to run, but Airflow marks the DAG as successful!
args = {
'owner': 'me',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True,
}
dag = DAG(
'test_timeout',
schedule_interval=None,
default_args=args,
dagrun_timeout=timedelta(seconds=20),
)
def this_passes(**kwargs):
return
def this_passes_with_delay(**kwargs):
time.sleep(120)
return
would_succeed = PythonOperator(
task_id='would_succeed',
dag=dag,
python_callable=this_passes,
email=to,
)
would_succeed_with_delay = PythonOperator(
task_id='would_succeed_with_delay',
dag=dag,
python_callable=this_passes_with_delay,
email=to,
)
would_succeed >> would_succeed_with_delay
No error messages are thrown. Am I using an incorrect parameter?
:param dagrun_timeout: specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns, and only once the # of active DagRuns == max_active_runs.
Timeouts. If you want a task to have a maximum runtime, set its execution_timeout attribute to a datetime. timedelta value that is the maximum permissible runtime. If it runs longer than this, Airflow will kick in and fail the task with a timeout exception.
Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
As stated in the source code:
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns, and only once the
# of active DagRuns == max_active_runs.
so this might be expected behavior as you set schedule_interval=None
. Here, the idea is rather to make sure a scheduled DAG won't last forever and block subsequent run intances.
Now, you may be interested in the execution_timeout
available in all operators.
For example, you could set a 60s timeout on your PythonOperator
like this:
would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
execution_timeout=timedelta(seconds=60),
python_callable=this_passes_with_delay,
email=to)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With