Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow: Delay a task for some period of time

Tags:

python

airflow

I am trying to execute a task after 5 minutes from the parent task inside a DAG.

DAG : Task 1 ----> Wait for 5 minutes ----> Task 2

How can I achieve this in Apache Airflow? Thanks in advance.

like image 972
Spandan Singh Avatar asked Mar 05 '19 11:03

Spandan Singh


People also ask

How do I set timeout in Airflow?

Timeouts. If you want a task to have a maximum runtime, set its execution_timeout attribute to a datetime. timedelta value that is the maximum permissible runtime. This applies to all Airflow tasks, including sensors.

Can we schedule a task in Airflow?

You can have the Airflow Scheduler be responsible for starting the process that turns the Python files contained in the DAGs folder into DAG objects that contain tasks to be scheduled.

What is retry delay in Airflow?

retries (int) – the number of retries that should be performed before failing the task. retry_delay (datetime.timedelta) – delay between retries. retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)


2 Answers

The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1 and Task 2


This can be achieved using PythonOperator

import time
from airflow.operators.python import PythonOperator

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))

task_1 >> delay_python_task >> task_2

Or using BashOperator as well

from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2

Note: The given code-snippets are NOT tested


References

  • example_python_operator.py
  • example_bash_operator.py

UPDATE-1

Here are some other ways of introducing delay

  • UPDATE: do NOT use this as pointed out by @Vit.ai. Original point: on_success_callback / on_failure_callback: Depending of whether Task 2 is supposed to run upon success or failure of Task 1, you can pass lambda: time.sleep(300) in either of these params of Task 1.
  • pre_execute() / post_execute(): Invoking time.sleep(300) in Task 1's post_execute() or Task 2's pre_execute() would also have the same effect. Of course this would involve modifying code for your tasks (1 or 2) so better avoid it

Personally I would prefer the extra task approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1 or Task 2

like image 77
y2k-shubham Avatar answered Oct 03 '22 21:10

y2k-shubham


@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:

< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)

task1 >> task2
like image 30
Vit.ai Avatar answered Oct 03 '22 21:10

Vit.ai