Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing a command line argument to airflow BashOperator

Is there a way to pass a command line argument to Airflow BashOperator. Currently, I have a python script that accepts a date argument and performs some specific activities like cleaning up specific folders older than given date.

In simplified code with just one task, what I would like to do is

from __future__ import print_function
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

default_args = {
    'owner'             : 'airflow'
    ,'depends_on_past'  : False
    ,'start_date'       : datetime(2017, 01, 18)
    ,'email'            : ['[email protected]']
    ,'retries'          : 1
    ,'retry_delay'      : timedelta(minutes=5)
}

dag = DAG(
    dag_id='data_dir_cleanup'
    ,default_args=default_args
    ,schedule_interval='0 13 * * *'
    ,dagrun_timeout=timedelta(minutes=10)
    )

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
        #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line)
        #,env=env
        ,dag=dag
    )

Thanks in advance,

like image 218
Shiva Avatar asked Feb 03 '17 03:02

Shiva


People also ask

How do you pass parameters in Airflow?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.

What is BashOperator in Airflow?

The Airflow BashOperator is used on the system to run a Bash script, command, or group of commands. You can import Airflow BashOperator using the following command: from airflow.operators.bash_operator import BashOperator.

How do you use an environment variable in Airflow?

Airflow Variables can also be created and managed using Environment Variables. The environment variable naming convention is AIRFLOW_VAR_{VARIABLE_NAME} , all uppercase. So if your variable key is FOO then the variable name should be AIRFLOW_VAR_FOO . Single underscores surround VAR .


1 Answers

The BashOperator is templated with Jinja2, meaning that you can pass arbitrary values. In your case it would be something like:

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command="python cleanup.py --date {{ params.DATE }} 2>&1 >>  /tmp/airflow/data_dir_cleanup.log"
        ,params = {'DATE' : 'this-should-be-a-date'}
        ,dag=dag
    )

See also: https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja for a broader example.

like image 179
Bolke de Bruin Avatar answered Sep 27 '22 22:09

Bolke de Bruin