I have a dummy DAG that I want to start episodically by setting its start_date
to today
and letting its schedul interval to daily
here is the DAG code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- airflow: DAG -*-
import logging
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
logger = logging.getLogger("DummyDAG")
def execute_python_function():
logging.info("HEY YO !!!")
return True
dag = DAG(dag_id='dummy_dag',
start_date=datetime.today())
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
py_operator = PythonOperator(task_id='exec_function',
python_callable=execute_python_function,
dag=dag)
start >> py_operator >> end
In Airflow 1.9.0, when I do an airflow trigger_dag -e 20190701
the DAG Run is created, Tasks Instances are created, scheduled and executed.
However, in Airflow 1.10.2 the DAG Run is created Task Instances too but they are stuck in None
state.
for both versions the depends_on_past is False
Here are the details of the start
task in Airflow 1.9.0 (it is executed, with success, after some time)
Task Instance Details
Dependencies Blocking Task From Getting Scheduled
Dependency: Reason
Dagrun Running: Task instance's dagrun was not in the 'running' state but in the state 'success'.
Task Instance State: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Execution Date: The execution date is 2019-07-10T00:00:00 but this is before the task's start date 2019-07-11T08:45:18.230876.
Execution Date: The execution date is 2019-07-10T00:00:00 but this is before the task's DAG's start date 2019-07-11T08:45:18.230876.
Task instance attribute
Attribute Value
dag_id dummy_dag
duration None
end_date 2019-07-10 16:32:10.372976
execution_date 2019-07-10 00:00:00
generate_command <function generate_command at 0x7fc9fcc85b90>
hostname airflow-worker-5dc5b999b6-2l5cp
is_premature False
job_id None
key ('dummy_dag', 'start', datetime.datetime(2019, 7, 10, 0, 0))
log <logging.Logger object at 0x7fca014e7f10>
log_filepath /home/airflow/gcs/logs/dummy_dag/start/2019-07-10T00:00:00.log
log_url https://i39907f7014685e91-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-10T00:00:00
logger <logging.Logger object at 0x7fca014e7f10>
mark_success_url https://i39907f7014685e91-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-10T00:00:00&upstream=false&downstream=false
max_tries 0
metadata MetaData(bind=None)
next_try_number 2
operator None
pid 180712
pool None
previous_ti None
priority_weight 3
queue default
queued_dttm None
run_as_user None
start_date 2019-07-10 16:32:08.483531
state success
task <Task(DummyOperator): start>
task_id start
test_mode False
try_number 2
unixname airflow
Task Attributes
Attribute Value
adhoc False
dag <DAG: dummy_dag>
dag_id dummy_dag
depends_on_past False
deps set([<TIDep(Not In Retry Period)>, <TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>])
downstream_list [<Task(PythonOperator): exec_function>]
downstream_task_ids ['exec_function']
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
log <logging.Logger object at 0x7fc9e2085350>
logger <logging.Logger object at 0x7fc9e2085350>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
owner Airflow
params {}
pool None
priority_weight 1
priority_weight_total 3
queue default
resources {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries 0
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 1 day, 0:00:00
sla None
start_date 2019-07-11 08:45:18.230876
task_concurrency None
task_id start
task_type DummyOperator
template_ext []
template_fields ()
trigger_rule all_success
ui_color #e8f7e4
ui_fgcolor #000
upstream_list []
upstream_task_ids []
wait_for_downstream False
Details of the start task in Airflow 1.10.2
Task Instance Details
Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Execution Date The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's start date 2019-07-11T08:53:32.593360+00:00.
Execution Date The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's DAG's start date 2019-07-11T08:53:32.593360+00:00.
Task Instance Attributes
Attribute Value
dag_id dummy_dag
duration None
end_date None
execution_date 2019-07-11T00:00:00+00:00
executor_config {}
generate_command <function generate_command at 0x7f4621301578>
hostname
is_premature False
job_id None
key ('dummy_dag', 'start', <Pendulum [2019-07-11T00:00:00+00:00]>, 1)
log <logging.Logger object at 0x7f4624883350>
log_filepath /home/airflow/gcs/logs/dummy_dag/start/2019-07-11T00:00:00+00:00.log
log_url https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00
logger <logging.Logger object at 0x7f4624883350>
mark_success_url https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 0
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool None
previous_ti None
priority_weight 3
queue default
queued_dttm None
raw False
run_as_user None
start_date None
state None
task <Task(DummyOperator): start>
task_id start
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
adhoc False
dag <DAG: dummy_dag>
dag_id dummy_dag
depends_on_past False
deps set([<TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>, <TIDep(Not In Retry Period)>])
downstream_list [<Task(PythonOperator): exec_function>]
downstream_task_ids set(['exec_function'])
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
inlets []
lineage_data None
log <logging.Logger object at 0x7f460b467e10>
logger <logging.Logger object at 0x7f460b467e10>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
outlets []
owner Airflow
params {}
pool None
priority_weight 1
priority_weight_total 3
queue default
resources {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries 0
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 1 day, 0:00:00
sla None
start_date 2019-07-11T08:53:32.593360+00:00
task_concurrency None
task_id start
task_type DummyOperator
template_ext []
template_fields ()
trigger_rule all_success
ui_color #e8f7e4
ui_fgcolor #000
upstream_list []
upstream_task_ids set([])
wait_for_downstream False
weight_rule downstream
The easiest way to rerun a task in Airflow is to clear the task status. Doing so updates two values in the metastore, causing the task to rerun: max_tries updates to 0 , and the current task instance state updates to None .
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Parallelism: This is the maximum number of tasks that can run at the same time in a single Airflow environment. If this setting is set to 32, for example, no more than 32 tasks can run concurrently across all DAGs.
Trigger Rules When you set dependencies between tasks, Airflow's default behavior is to run a task only when all upstream tasks have succeeded. However, you can change this default behavior using trigger rules. The options available are: all_success: (default) The task runs only when all upstream tasks have succeeded.
IMO it's not a problem of the version. If you check the logs, you will see the messages like:
Execution Date:
The execution date is 2019-07-10T00:00:00 but this is before the task's start date 2019-07-11T08:45:18.230876.
The execution date is the one you put in trigger_dag
command whereas the start date of your DAG is changing because Python's datetime.today()
returns the current time. To see that, you can do:
airflow@e3bc9a0a7a3e:~$ airflow trigger_dag dummy_dag -e 20190702
And later go to http://localhost:8080/admin/airflow/task?dag_id=dummy_dag&task_id=start&execution_date=2019-07-02T00%3A00%3A00%2B00%3A00 (or any corresponding URL) and refresh the page. You should see Dependency > Execution date
changing every time.
In your case it will be problematic since you're trying to trigger a DAG from the past. A better way is to specify a static date or use any of Airflow's util methods to figure it out:
dag = DAG(dag_id='dummy_dag',
start_date=datetime(2019, 7, 11, 0, 0))
Otherwise, if you want to reprocess historical data, you can use airflow backfill
update
After clarifications from comments, we found another way to trigger a DAG on demand with the property schedule_interval=None
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With