Can I use macros with the PythonOperator? I tried following, but I was unable to get the macros rendered:
dag = DAG( 'temp', default_args=default_args, description='temp dag', schedule_interval=timedelta(days=1)) def temp_def(a, b, **kwargs): print '{{ds}}' print '{{execution_date}}' print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) ds = '{{ ds }}' mm = '{{ execution_date }}' t1 = PythonOperator( task_id='temp_task', python_callable=temp_def, op_args=[mm , ds], provide_context=False, dag=dag)
The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG.
provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
Macros only get processed for templated fields. To get Jinja to process this field, extend the PythonOperator
with your own.
class MyPythonOperator(PythonOperator): template_fields = ('templates_dict','op_args')
I added 'templates_dict'
to the template_fields
because the PythonOperator
itself has this field templated: PythonOperator
Now you should be able to use a macro within that field:
ds = '{{ ds }}' mm = '{{ execution_date }}' t1 = MyPythonOperator( task_id='temp_task', python_callable=temp_def, op_args=[mm , ds], provide_context=False, dag=dag)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With