I'm using airflow.operators.sensors.ExternalTaskSensor
to make one Dag wait for another.
dag = DAG(
'dag2',
default_args={
'owner': 'Me',
'depends_on_past': False,
'start_date': start_datetime,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
template_searchpath="%s/me/resources/" % DAGS_FOLDER,
schedule_interval="{} {} * * *".format(minute, hour),
max_active_runs=1
)
wait_for_dag1 = ExternalTaskSensor(
task_id='wait_for_dag1',
external_dag_id='dag1',
external_task_id='dag1_task1',
dag=dag
)
If something seriously wrong happens with upstream Dag and it fails to complete during the given time period, I want upstream Dag (ExternalTaskSensor operator) crash as well, instead of hanging forever.
How can I add a timeout to ExternalTaskSensor?
I'm looking into documentation, but it does not seem to have a timeout
parameter or something similar. What should I do?
https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
timeout : The maximum amount of time in seconds that the sensor should check the condition for. If the condition has not been met when this time is reached, the task fails.
Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).
Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.
In sensor mode='reschedule' means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.
The ExternalTaskSensor
does take a timeout
argument in seconds. It inherits the argument from BaseSensorOperator
(https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html). If you pass it timeout=60
on instantiation, it will fail after 60 seconds.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With