Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access "params" in body of dag

Tags:

airflow

I want to access the "params" of a dag in the body of the day, but I receive an error message that params is not defined. For example:

from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    'owner': 'airflow',
    'retries': 0,
}

with DAG(
        dag_id="start-matrix",
        default_args=default_args,
        render_template_as_native_obj=True,
        params={
            'proj' : "team",
            'username' : "leebee",
        },
        schedule_interval=None,
        start_date=datetime(2021, 4, 5, 15, 0),
        catchup=False
) as dag:
    def test(proj, name, dag):
        print(proj + " " + name)
        pass

    test({{ params.proj }}, {{ params.username }}, dag)
    pass

But when I load the dag I get this error:

Broken DAG: [/local/airflow/dags/baddag.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/local/airflow/dags/baddag.py", line 26, in <module>
    test({{ params.proj }}, {{ params.username }}, dag)
NameError: name 'params' is not defined

Is it possible to do this?

like image 381
user1943569 Avatar asked Oct 23 '25 14:10

user1943569


1 Answers

Your function test isn't registered as an Airflow task. It's being called as a regular Python function. That won't work for two reasons:

  1. The variable params isn't known within this scope (therefore you get that error)
  2. Python will actually interpret {{ ... }} as a set in a set. Say params was replaced by a known variable, you'd still get an error because sets aren't hashable (try evaluating {{ "test" }})

params is a variable known in Airflow at runtime. Meaning, when a task is executed in Airflow, a list of variables is gathered that is available whilst running the task. See the full list here: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables.

To tell Airflow to call a Python function in a task, you could use the PythonOperator:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "airflow",
    "retries": 0,
}

with DAG(
    dag_id="start-matrix",
    default_args=default_args,
    render_template_as_native_obj=True,
    params={
        "proj": "team",
        "username": "leebee",
    },
    schedule_interval=None,
    start_date=datetime(2021, 4, 5, 15, 0),
    catchup=False,
) as dag:

    def test(proj, name):
        print(proj + " " + name)
        pass

    PythonOperator(
        task_id="test",
        python_callable=test,
        op_kwargs={"proj": "{{ params.proj }}", "name": "{{ params.username }}"},
    )

Let's break it down. First, the import:

from airflow.operators.python import PythonOperator

Next, the PythonOperator:

PythonOperator(
    task_id="test",
    python_callable=test,
    op_kwargs={"proj": "{{ params.proj }}", "name": "{{ params.username }}"},
)

The arguments:

  • task_id represents the name of the task in the UI
  • python_callable references the Python function this task should call
  • op_kwargs is a way to pass arguments to the function referenced by python_callable. Since we don't know the value of params.proj at runtime yet, you'll need to supply templated variables, which are strings evaluated at runtime. So e.g. "{{ params.proj }}" is evaluated at runtime, meaning at runtime the value of params.proj is looked up and the templated string is replaced by the value of params.proj. This value is then given to the argument named proj. Read more about templating here: https://docs.astronomer.io/learn/templating.

A slightly shorter way to write this in Airflow is with the TaskFlow API (see 1, 2):

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import get_current_context

default_args = {
    "owner": "airflow",
    "retries": 0,
}

with DAG(
    dag_id="start-matrix",
    default_args=default_args,
    render_template_as_native_obj=True,
    params={
        "proj": "team",
        "username": "leebee",
    },
    schedule_interval=None,
    start_date=datetime(2021, 4, 5, 15, 0),
    catchup=False,
) as dag:

    @task
    def test():
        context = get_current_context()
        print(context["params"]["proj"] + " " + context["params"]["username"])

    test()

Here, you don't have to provide proj and username via function arguments, but can fetch their values from the context which is fetched with get_current_context(). The @task decorator automatically turns the function into a PythonOperator internally. When using Python functions only, this is generally a preferred way of writing Airflow code.

like image 107
Bas Harenslak Avatar answered Oct 26 '25 17:10

Bas Harenslak