Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Macros In Python Operator

Tags:

airflow

I'm trying to use the Airflow macros in my Python Operator but I keep receiving "airflow: error: unrecognized arguments:"

So I am importing a function that has 3 positional arguments: (sys.argv,start_date,end_date) and I am hoping to make the start_date and end_date the execution date in Airflow.

The function arguments look something like this

def main(argv,start_date,end_date):

Here is the task I have in the DAG:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)
like image 913
D.Lee Avatar asked Jun 05 '18 20:06

D.Lee


People also ask

What is the use of Python operator in Airflow?

Apache Airflow is an open-source workflow management platform for building Data Pipelines. It enables users to schedule and run Data Pipelines using the flexible Python Operators and framework.

What are Airflow macros?

airflow.macros. ds_format (ds, input_format, output_format)[source] Takes an input string and outputs another string as specified in the output format Parameters. ds (str) – input string which contains a date. input_format (str) – input string format.


2 Answers

Since you're passing in dates that need to be rendered by Airflow, you'll want to use the templates_dict parameter in the Python Operator. This field is the only one that Airflow will recognize as containing templates.

You can create a custom Python operator that recognizes more fields as templates by copy-ing the existing operator and add the relevant fields to the template_fields tuple.

def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')


t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)
like image 150
Ben Gregory Avatar answered Oct 17 '22 02:10

Ben Gregory


You can "wrap" the call to the main function with the following:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=lambda **context: main([], context["ds"], context["ds"]),
    dag=dag)

If lambdas aren't your cup of tea you could define a function, call that, and have it call out to main.

like image 1
joebeeson Avatar answered Oct 17 '22 02:10

joebeeson