Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why tasks are stuck in None state in Airflow 1.10.2 after a trigger_dag

I have a dummy DAG that I want to start episodically by setting its start_date to today and letting its schedul interval to daily

here is the DAG code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- airflow: DAG -*-
import logging

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

logger = logging.getLogger("DummyDAG")


def execute_python_function():
    logging.info("HEY YO !!!")
    return True


dag = DAG(dag_id='dummy_dag',
          start_date=datetime.today())

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

py_operator = PythonOperator(task_id='exec_function',
                             python_callable=execute_python_function,
                             dag=dag)

start >> py_operator >> end

In Airflow 1.9.0, when I do an airflow trigger_dag -e 20190701 the DAG Run is created, Tasks Instances are created, scheduled and executed.

However, in Airflow 1.10.2 the DAG Run is created Task Instances too but they are stuck in None state.

for both versions the depends_on_past is False

Here are the details of the start task in Airflow 1.9.0 (it is executed, with success, after some time)

Task Instance Details

Dependencies Blocking Task From Getting Scheduled

Dependency:         Reason
Dagrun Running:     Task instance's dagrun was not in the 'running' state but in the state 'success'.
Task Instance State:    Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Execution Date:     The execution date is 2019-07-10T00:00:00 but this is before the task's start date 2019-07-11T08:45:18.230876.
Execution Date:     The execution date is 2019-07-10T00:00:00 but this is before the task's DAG's start date 2019-07-11T08:45:18.230876.

Task instance attribute

Attribute   Value
dag_id  dummy_dag
duration    None
end_date    2019-07-10 16:32:10.372976
execution_date  2019-07-10 00:00:00
generate_command    <function generate_command at 0x7fc9fcc85b90>
hostname    airflow-worker-5dc5b999b6-2l5cp
is_premature    False
job_id  None
key     ('dummy_dag', 'start', datetime.datetime(2019, 7, 10, 0, 0))
log     <logging.Logger object at 0x7fca014e7f10>
log_filepath    /home/airflow/gcs/logs/dummy_dag/start/2019-07-10T00:00:00.log
log_url     https://i39907f7014685e91-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-10T00:00:00
logger  <logging.Logger object at 0x7fca014e7f10>
mark_success_url    https://i39907f7014685e91-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-10T00:00:00&upstream=false&downstream=false
max_tries   0
metadata    MetaData(bind=None)
next_try_number     2
operator    None
pid     180712
pool    None
previous_ti     None
priority_weight     3
queue   default
queued_dttm     None
run_as_user     None
start_date  2019-07-10 16:32:08.483531
state   success
task    <Task(DummyOperator): start>
task_id     start
test_mode   False
try_number  2
unixname    airflow

Task Attributes

Attribute   Value
adhoc   False
dag     <DAG: dummy_dag>
dag_id  dummy_dag
depends_on_past     False
deps    set([<TIDep(Not In Retry Period)>, <TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>])
downstream_list     [<Task(PythonOperator): exec_function>]
downstream_task_ids     ['exec_function']
email   None
email_on_failure    True
email_on_retry  True
end_date    None
execution_timeout   None
log     <logging.Logger object at 0x7fc9e2085350>
logger  <logging.Logger object at 0x7fc9e2085350>
max_retry_delay     None
on_failure_callback     None
on_retry_callback   None
on_success_callback     None
owner   Airflow
params  {}
pool    None
priority_weight     1
priority_weight_total   3
queue   default
resources   {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries     0
retry_delay     0:05:00
retry_exponential_backoff   False
run_as_user     None
schedule_interval   1 day, 0:00:00
sla     None
start_date  2019-07-11 08:45:18.230876
task_concurrency    None
task_id     start
task_type   DummyOperator
template_ext    []
template_fields     ()
trigger_rule    all_success
ui_color    #e8f7e4
ui_fgcolor  #000
upstream_list   []
upstream_task_ids   []
wait_for_downstream     False

Details of the start task in Airflow 1.10.2

Task Instance Details
Dependencies Blocking Task From Getting Scheduled

Dependency  Reason
Execution Date  The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's start date 2019-07-11T08:53:32.593360+00:00.
Execution Date  The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's DAG's start date 2019-07-11T08:53:32.593360+00:00.


Task Instance Attributes

Attribute   Value
dag_id  dummy_dag
duration    None
end_date    None
execution_date  2019-07-11T00:00:00+00:00
executor_config     {}
generate_command    <function generate_command at 0x7f4621301578>
hostname    
is_premature    False
job_id  None
key     ('dummy_dag', 'start', <Pendulum [2019-07-11T00:00:00+00:00]>, 1)
log     <logging.Logger object at 0x7f4624883350>
log_filepath    /home/airflow/gcs/logs/dummy_dag/start/2019-07-11T00:00:00+00:00.log
log_url     https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00
logger  <logging.Logger object at 0x7f4624883350>
mark_success_url    https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries   0
metadata    MetaData(bind=None)
next_try_number     1
operator    None
pid     None
pool    None
previous_ti     None
priority_weight     3
queue   default
queued_dttm     None
raw     False
run_as_user     None
start_date  None
state   None
task    <Task(DummyOperator): start>
task_id     start
test_mode   False
try_number  1
unixname    airflow


Task Attributes


Attribute   Value
adhoc   False
dag     <DAG: dummy_dag>
dag_id  dummy_dag
depends_on_past     False
deps    set([<TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>, <TIDep(Not In Retry Period)>])
downstream_list     [<Task(PythonOperator): exec_function>]
downstream_task_ids     set(['exec_function'])
email   None
email_on_failure    True
email_on_retry  True
end_date    None
execution_timeout   None
executor_config     {}
inlets  []
lineage_data    None
log     <logging.Logger object at 0x7f460b467e10>
logger  <logging.Logger object at 0x7f460b467e10>
max_retry_delay     None
on_failure_callback     None
on_retry_callback   None
on_success_callback     None
outlets     []
owner   Airflow
params  {}
pool    None
priority_weight     1
priority_weight_total   3
queue   default
resources   {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries     0
retry_delay     0:05:00
retry_exponential_backoff   False
run_as_user     None
schedule_interval   1 day, 0:00:00
sla     None
start_date  2019-07-11T08:53:32.593360+00:00
task_concurrency    None
task_id     start
task_type   DummyOperator
template_ext    []
template_fields     ()
trigger_rule    all_success
ui_color    #e8f7e4
ui_fgcolor  #000
upstream_list   []
upstream_task_ids   set([])
wait_for_downstream     False
weight_rule     downstream

like image 857
MassyB Avatar asked Jul 11 '19 08:07

MassyB


People also ask

How do I run a failed task in Airflow?

The easiest way to rerun a task in Airflow is to clear the task status. Doing so updates two values in the metastore, causing the task to rerun: max_tries updates to 0 , and the current task instance state updates to None .

How many tasks can an Airflow worker handle?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

How many tasks can run in parallel Airflow?

Parallelism: This is the maximum number of tasks that can run at the same time in a single Airflow environment. If this setting is set to 32, for example, no more than 32 tasks can run concurrently across all DAGs.

How do I set task dependencies in Airflow?

Trigger Rules When you set dependencies between tasks, Airflow's default behavior is to run a task only when all upstream tasks have succeeded. However, you can change this default behavior using trigger rules. The options available are: all_success: (default) The task runs only when all upstream tasks have succeeded.


1 Answers

IMO it's not a problem of the version. If you check the logs, you will see the messages like:

Execution Date:
The execution date is 2019-07-10T00:00:00 but this is before the task's start date 2019-07-11T08:45:18.230876.

The execution date is the one you put in trigger_dag command whereas the start date of your DAG is changing because Python's datetime.today() returns the current time. To see that, you can do:

airflow@e3bc9a0a7a3e:~$ airflow trigger_dag dummy_dag -e 20190702

And later go to http://localhost:8080/admin/airflow/task?dag_id=dummy_dag&task_id=start&execution_date=2019-07-02T00%3A00%3A00%2B00%3A00 (or any corresponding URL) and refresh the page. You should see Dependency > Execution date changing every time.

In your case it will be problematic since you're trying to trigger a DAG from the past. A better way is to specify a static date or use any of Airflow's util methods to figure it out:

dag = DAG(dag_id='dummy_dag',
          start_date=datetime(2019, 7, 11, 0, 0))

Otherwise, if you want to reprocess historical data, you can use airflow backfill


update

Running DAGs on demand

After clarifications from comments, we found another way to trigger a DAG on demand with the property schedule_interval=None.

like image 168
Bartosz Konieczny Avatar answered Sep 20 '22 08:09

Bartosz Konieczny