Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Status of Airflow task within the dag

Tags:

airflow

I need the status of the task like if it is running or upforretry or failed within the same dag. So i tried to get it using the below code, though i got no output...

Auto = PythonOperator(
    task_id='test_sleep',
    python_callable=execute_on_emr,
    op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
    dag=dag)

logger.info(Auto)

The intention is to kill certain running tasks once a particular task on airflow completes.

Question is how do i get the state of a task like is it in the running state or failed or success

like image 282
Chetan J Avatar asked May 02 '17 07:05

Chetan J


People also ask

How do you check the status of an Airflow task?

Start by grabbing the task_ids and state of the task you're interested in with a db call. That should give you the state (and name, for reference) of the task you're trying to monitor. State is stored as a simple lowercase string.

What is the typical journey of a task in Airflow?

Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them into order to express the order they should run in. There are three basic kinds of Task: Operators, predefined task templates that you can string together quickly to build most parts of your DAGs.

What is DAG and task in Airflow?

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.


1 Answers

I am doing something similar. I need to check for one task if the previous 10 runs of another task were successful. taky2 sent me on the right path. It is actually fairly easy:

from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()

As I want to check that within the dag, it is not neccessary to specify the dag. I simply created a function to loop through the past n_days and check the status.

def check_status(**kwargs):
    last_n_days = 10
    for n in range(0,last_n_days):
        date = kwargs['execution_date']- timedelta(n)
        ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before') 
        state = ti.current_state()
        if state != 'success':
            raise ValueError('Not all previous tasks successfully completed.')

When you call the function make sure to set provide_context.

check_success_task = PythonOperator(
    task_id='check_success_days_before',
    python_callable= check_status,
    provide_context=True,
    dag=dag
)

UPDATE: When you want to call a task from another dag, you need to call it like this:

from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance

dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[*my_dag_id*]
my_task = check_dag.get_task(*my_task_id*)
ti = TaskInstance(my_task, date)

Apparently there is also an api-call by now doing the same thing:

from airflow.api.common.experimental.get_task_instance import get_task_instance
ti = get_task_instance(*my_dag_id*, *my_task_id*, date)
like image 73
Krischl Avatar answered Sep 22 '22 13:09

Krischl