I am passing arguments to a Taskflow based Dag using the REST API. Looking at similar issues raised on this forum, it seems below are the usual ways of accessing the passed arguments.
#From inside a template field or file:
{{ dag_run.conf['key'] }}
#Or when context is available, e.g. within a python callable of the PythonOperator:
context['dag_run'].conf['key']
I attempted to get the context dictionary
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), params=None)
def classic_xgb(**context):
"""
### TaskFlow API Tutorial Documentation
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
print("context is ", context)
The output is <airflow.models.dagparam.DagParam object at 0x7f735c634510> Now how do I get to the conf dictionary that was passed as the input arguement to the Dag? I need to use the arguements in my python code and so templating option does not seem viable for me.
Any help would be most appreciated.
Thanks
Best Regards,
Adeel
The Taskflow API is an abstraction built on top of XComs that allows developers to send messages between tasks in a DAG (Directed Acyclic Graph).
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs. SubDAGs caused performance and functional issues for many users, and they have been deprecated as of Airflow 2.0 and will be removed entirely in a future release.
TaskFlow API in Apache Airflow 2.0 — Should You Use It? T askFlow API is a feature that promises data sharing functionality and a simple interface for building data pipelines in Apache Airflow 2.0. It should allow the end-users to write Python code rather than Airflow code.
This tutorial builds on the regular Airflow Tutorial and focuses specifically on writing data pipelines using the TaskFlow API paradigm which is introduced as part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.
Apart from the TaskFlow API, Airflow 2.0 provides an abstraction that allows treating a group of tasks as a single task within a DAG. The implementation of the TaskGroup was motivated by the inefficiencies and bugs that many data engineers experienced when using SubDags.
In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself automatically generates the dependencies as shown below. If you have tasks that require complex or conflicting requirements then you will have the ability to use the Taskflow API with either a Docker container (since version 2.2.0) or Python virtual environment (since 2.0.2).
There is a new function get_current_context()
to fetch the context in Airflow 2.0. Once you have the context dict, the 'params' key contains the arguments sent to the Dag via REST API. The following code solved the issue.
from airflow.operators.python import task, get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def classic_xgb(**kwargs):
"""
@task()
def extract():
context = get_current_context()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With