Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Airflow ExternalTaskSensor gets stuck


I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.

Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.

First DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},

def do_first_task():
    print('First task is done')


Second DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},

def do_second_task():
    print('Second task is done')

    dag=dag) >> \

What am I missing here?

like image 465
Aleksei Solovev Avatar asked Oct 18 '17 09:10

Aleksei Solovev

People also ask

How does ExternalTaskSensor work?

ExternalTaskSensor assumes that you are dependent on a task in a dag run with the same execution date. This means that in your case dags a and b need to run on the same schedule (e.g. every day at 9:00am or w/e).

What is execution date in airflow?

So its execution date is also in the day it is triggered, because it is scheduled at minute 50 for each hour. In airflow @hourly corresponds to 0 * * * *. Its schedule also similar. It is triggered at minute 0 for each hour, but in the doc its execution date is given as 2016-01-01.

What is an airflow Dag?

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

1 Answers

ExternalTaskSensor assumes that you are dependent on a task in a dag run with the same execution date.

This means that in your case dags a and b need to run on the same schedule (e.g. every day at 9:00am or w/e).

Otherwise you need to use the execution_delta or execution_date_fn when you instantiate an ExternalTaskSensor.

Here is the documentation inside the operator itself to help clarify further:

:param execution_delta: time difference with the previous execution to
    look at, the default is the same execution_date as the current task.
    For yesterday, use [positive!] datetime.timedelta(days=1). Either
    execution_delta or execution_date_fn can be passed to
    ExternalTaskSensor, but not both.

:type execution_delta: datetime.timedelta

:param execution_date_fn: function that receives the current execution date
    and returns the desired execution date to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both.

:type execution_date_fn: callable
like image 138
jhnclvr Avatar answered Sep 30 '22 03:09
