Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how do I use the --conf option in airflow

I am trying to run a airflow DAG and need to pass some parameters for the tasks.

How do I read the JSON string passed as the --conf parameter in the command line trigger_dag command, in the python DAG file.

ex: airflow trigger_dag 'dag_name' -r 'run_id' --conf '{"key":"value"}'

like image 650
Praveen Singh Avatar asked Aug 29 '17 18:08

Praveen Singh


People also ask

How do I change Airflow config?

cfg in your $AIRFLOW_HOME directory ( ~/airflow by default). This file contains Airflow's configuration and you can edit it to change any of the settings. You can also set options with environment variables by using this format: AIRFLOW__{SECTION}__{KEY} (note the double underscores).

How do I access Airflow cfg files?

The Apache Airflow configuration file is located at /opt/bitnami/airflow/airflow. cfg.

How do you use parameters in Airflow?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.


2 Answers

Two ways. From inside a template field or file:

{{ dag_run.conf['key'] }}

Or when context is available, e.g. within a python callable of the PythonOperator:

context['dag_run'].conf['key']
like image 73
Daniel Huang Avatar answered Sep 29 '22 06:09

Daniel Huang


In the example provided here https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py#L62 while trying to parse 'conf' passed in an airflow REST API call, use provide_context=True in pythonOperator.

Also, the key-value pair passed in json format in the REST API call, can be accessed in bashOperator and sparkOperator as '\'{{ dag_run.conf["key"] if dag_run else "" }}\''

dag = DAG(
    dag_id="example_dag",
    default_args={"start_date": days_ago(2), "owner": "airflow"},
    schedule_interval=None
)

def run_this_func(**context):
    """
    Print the payload "message" passed to the DagRun conf attribute.
    :param context: The execution context
    :type context: dict
    """
    print("context", context)
    print("Remotely received value of {} for key=message".format(context["dag_run"].conf["key"]))

#PythonOperator usage
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag, provide_context=True)

#BashOperator usage
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["key"] if dag_run else "" }}\'"',
    dag=dag
)

#SparkSubmitOperator usage
spark_task = SparkSubmitOperator(
        task_id="task_id",
        conn_id=spark_conn_id,
        name="task_name",
        application="example.py",
        application_args=[
            '--key', '\'{{ dag_run.conf["key"] if dag_run else "" }}\''
        ],
        num_executors=10,
        executor_cores=5,
        executor_memory='30G',
        #driver_memory='2G',
        conf={'spark.yarn.maxAppAttempts': 1},
        dag=dag)
like image 24
zingsy Avatar answered Sep 29 '22 05:09

zingsy