I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.
Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.
First DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
Second DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
What am I missing here?
ExternalTaskSensor assumes that you are dependent on a task in a dag run with the same execution date. This means that in your case dags a and b need to run on the same schedule (e.g. every day at 9:00am or w/e).
So its execution date is also in the day it is triggered, because it is scheduled at minute 50 for each hour. In airflow @hourly corresponds to 0 * * * *. Its schedule also similar. It is triggered at minute 0 for each hour, but in the doc its execution date is given as 2016-01-01.
In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
ExternalTaskSensor
assumes that you are dependent on a task in a dag run with the same execution date.
This means that in your case dags a
and b
need to run on the same schedule (e.g. every day at 9:00am or w/e).
Otherwise you need to use the execution_delta
or execution_date_fn
when you instantiate an ExternalTaskSensor
.
Here is the documentation inside the operator itself to help clarify further:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With