Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add manual tasks in an Apache Airflow Dag

Tags:

airflow

I'm using Apache Airflow to manage the data processing pipeline. In the middle of the pipeline, some data need to be reviewed before the next-step processing. e.g. ... -> task1 -> human review -> task2 -> ... where task1 and task2 are data processing task. When task1 finished, the generated data by task1 needs to be reviewed by human. After the reviewer approved the data, task2 could be launched. Human review tasks may take a very long time(e.g. several weeks).

I'm thinking to use an external database to store the human review result. And use a Sensor to poke the review result by a time interval. But it will occupy an Airflow worker until the review is done.

any idea?

like image 721
Freedom Avatar asked Mar 07 '23 00:03

Freedom


2 Answers

Piggy-packing off of Freedom's answer and Robert Elliot's answer, here is a full working example that gives the user two weeks to review the results of the first task before failing permanently:

from datetime import timedelta

from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

from my_tasks import first_task_callable, second_task_callable


TIMEOUT = timedelta(days=14)


def task_to_fail():
    raise AirflowException("Please change this step to success to continue")


dag = DAG(dag_id="my_dag")

first_task = PythonOperator(
    dag=dag,
    task_id="first_task",
    python_callable=first_task_callable
)

manual_sign_off = PythonOperator(
    dag=dag,
    task_id="manual_sign_off",
    python_callable=task_to_fail,
    retries=1,
    max_retry_delay=TIMEOUT
)

second_task = PythonOperator(
    dag=dag,
    task_id="second_task",
    python_callable=second_task_callable
)

first_task >> manual_sign_off >> second_task
like image 71
turnerm Avatar answered Apr 08 '23 13:04

turnerm


A colleague suggested having a task that always fails, so the manual step is simply to mark it as a success. I implemented it as so:

def always_fail():
    raise AirflowException('Please change this step to success to continue')


manual_sign_off = PythonOperator(
    task_id='manual_sign_off',
    dag=dag,
    python_callable=always_fail
)

start >> manual_sign_off >> end
like image 35
Robert Elliot Avatar answered Apr 08 '23 14:04

Robert Elliot