Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing arguements using Taskflow API in Airflow 2.0

Tags:

airflow

I am passing arguments to a Taskflow based Dag using the REST API. Looking at similar issues raised on this forum, it seems below are the usual ways of accessing the passed arguments.

#From inside a template field or file:
{{ dag_run.conf['key'] }}
#Or when context is available, e.g. within a python callable of the PythonOperator:
context['dag_run'].conf['key'] 

I attempted to get the context dictionary

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), params=None)
def classic_xgb(**context):
    """
    ### TaskFlow API Tutorial Documentation
    [here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        print("context is ", context)

The output is <airflow.models.dagparam.DagParam object at 0x7f735c634510> Now how do I get to the conf dictionary that was passed as the input arguement to the Dag? I need to use the arguements in my python code and so templating option does not seem viable for me.

Any help would be most appreciated.

Thanks

Best Regards,

Adeel

like image 977
Adeel Hashmi Avatar asked Jan 07 '21 11:01

Adeel Hashmi


People also ask

What is Taskflow API?

The Taskflow API is an abstraction built on top of XComs that allows developers to send messages between tasks in a DAG (Directed Acyclic Graph).

How do I create a dynamic task in Airflow?

Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.

What is Subdag in Airflow?

SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs. SubDAGs caused performance and functional issues for many users, and they have been deprecated as of Airflow 2.0 and will be removed entirely in a future release.

Should you use taskflow API in Apache Airflow?

TaskFlow API in Apache Airflow 2.0 — Should You Use It? T askFlow API is a feature that promises data sharing functionality and a simple interface for building data pipelines in Apache Airflow 2.0. It should allow the end-users to write Python code rather than Airflow code.

What is the taskflow API paradigm?

This tutorial builds on the regular Airflow Tutorial and focuses specifically on writing data pipelines using the TaskFlow API paradigm which is introduced as part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.

What is a taskgroup in airflow?

Apart from the TaskFlow API, Airflow 2.0 provides an abstraction that allows treating a group of tasks as a single task within a DAG. The implementation of the TaskGroup was motivated by the inefficiencies and bugs that many data engineers experienced when using SubDags.

How are dependencies generated with the taskflow API?

In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself automatically generates the dependencies as shown below. If you have tasks that require complex or conflicting requirements then you will have the ability to use the Taskflow API with either a Docker container (since version 2.2.0) or Python virtual environment (since 2.0.2).


1 Answers

There is a new function get_current_context() to fetch the context in Airflow 2.0. Once you have the context dict, the 'params' key contains the arguments sent to the Dag via REST API. The following code solved the issue.

from airflow.operators.python import task, get_current_context

default_args = {
    'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def classic_xgb(**kwargs):

    """
    @task()
    def extract():
        context = get_current_context()
like image 87
Adeel Hashmi Avatar answered Dec 27 '22 10:12

Adeel Hashmi