I want to access the "params" of a dag in the body of the day, but I receive an error message that params is not defined. For example:
from datetime import datetime, timedelta
from airflow import DAG
default_args = {
'owner': 'airflow',
'retries': 0,
}
with DAG(
dag_id="start-matrix",
default_args=default_args,
render_template_as_native_obj=True,
params={
'proj' : "team",
'username' : "leebee",
},
schedule_interval=None,
start_date=datetime(2021, 4, 5, 15, 0),
catchup=False
) as dag:
def test(proj, name, dag):
print(proj + " " + name)
pass
test({{ params.proj }}, {{ params.username }}, dag)
pass
But when I load the dag I get this error:
Broken DAG: [/local/airflow/dags/baddag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/local/airflow/dags/baddag.py", line 26, in <module>
test({{ params.proj }}, {{ params.username }}, dag)
NameError: name 'params' is not defined
Is it possible to do this?
Your function test isn't registered as an Airflow task. It's being called as a regular Python function. That won't work for two reasons:
params isn't known within this scope (therefore you get that error){{ ... }} as a set in a set. Say params was replaced by a known variable, you'd still get an error because sets aren't hashable (try evaluating {{ "test" }})params is a variable known in Airflow at runtime. Meaning, when a task is executed in Airflow, a list of variables is gathered that is available whilst running the task. See the full list here: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables.
To tell Airflow to call a Python function in a task, you could use the PythonOperator:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "airflow",
"retries": 0,
}
with DAG(
dag_id="start-matrix",
default_args=default_args,
render_template_as_native_obj=True,
params={
"proj": "team",
"username": "leebee",
},
schedule_interval=None,
start_date=datetime(2021, 4, 5, 15, 0),
catchup=False,
) as dag:
def test(proj, name):
print(proj + " " + name)
pass
PythonOperator(
task_id="test",
python_callable=test,
op_kwargs={"proj": "{{ params.proj }}", "name": "{{ params.username }}"},
)
Let's break it down. First, the import:
from airflow.operators.python import PythonOperator
Next, the PythonOperator:
PythonOperator(
task_id="test",
python_callable=test,
op_kwargs={"proj": "{{ params.proj }}", "name": "{{ params.username }}"},
)
The arguments:
task_id represents the name of the task in the UIpython_callable references the Python function this task should callop_kwargs is a way to pass arguments to the function referenced by python_callable. Since we don't know the value of params.proj at runtime yet, you'll need to supply templated variables, which are strings evaluated at runtime. So e.g. "{{ params.proj }}" is evaluated at runtime, meaning at runtime the value of params.proj is looked up and the templated string is replaced by the value of params.proj. This value is then given to the argument named proj. Read more about templating here: https://docs.astronomer.io/learn/templating.A slightly shorter way to write this in Airflow is with the TaskFlow API (see 1, 2):
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import get_current_context
default_args = {
"owner": "airflow",
"retries": 0,
}
with DAG(
dag_id="start-matrix",
default_args=default_args,
render_template_as_native_obj=True,
params={
"proj": "team",
"username": "leebee",
},
schedule_interval=None,
start_date=datetime(2021, 4, 5, 15, 0),
catchup=False,
) as dag:
@task
def test():
context = get_current_context()
print(context["params"]["proj"] + " " + context["params"]["username"])
test()
Here, you don't have to provide proj and username via function arguments, but can fetch their values from the context which is fetched with get_current_context(). The @task decorator automatically turns the function into a PythonOperator internally. When using Python functions only, this is generally a preferred way of writing Airflow code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With