The method of getting a BashOperator
or SqlOperator
to pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperator
my test of what I understand from the docs is not working. I am not sure how the templates_exts
and templates_dict
parameters would correctly interact to pick up a file.
In my dags folder I've created: pyoptemplate.sql
and pyoptemplate.t
as well as test_python_operator_template.py
:
SELECT * FROM {{params.table}};
SELECT * FROM {{params.table}};
# coding: utf-8 # vim:ai:si:et:sw=4 ts=4 tw=80 """ # A Test of Templates in PythonOperator """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import pprint pp = pprint.PrettyPrinter(indent=4) def templated_function(ds, **kwargs): """This function will try to use templates loaded from external files""" pp.pprint(ds) pp.pprint(kwargs) # Define the DAG dag = DAG(dag_id='test_python_operator_template_dag', default_args={"owner": "lamblin", "start_date": datetime.now()}, template_searchpath=['/Users/daniellamblin/airflow/dags'], schedule_interval='@once') # Define the single task in this controller example DAG op = PythonOperator(task_id='test_python_operator_template', provide_context=True, python_callable=templated_function, templates_dict={ 'pyoptemplate': '', 'pyoptemplate.sql': '', 'sql': 'pyoptemplate', 'file1':'pyoptemplate.sql', 'file2':'pyoptemplate.t', 'table': '{{params.table}}'}, templates_exts=['.sql','.t'], params={'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, dag=dag)
The result from a run shows that table
was templated correctly as a string, but the others did not pull in any files for templating.
dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18 [2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor [2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags [2017-01-18 23:58:07,620] {models.py:1196} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 1 -------------------------------------------------------------------------------- [2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00 '2017-01-18' { u'END_DATE': '2017-01-18', u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>, u'dag': <DAG: test_python_operator_template_dag>, u'dag_run': None, u'ds_nodash': u'20170118', u'end_date': '2017-01-18', u'execution_date': datetime.datetime(2017, 1, 18, 0, 0), u'latest_date': '2017-01-18', u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>, u'params': { 'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, u'run_id': None, u'tables': None, u'task': <Task(PythonOperator): test_python_operator_template>, u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118', 'templates_dict': { 'file1': u'pyoptemplate.sql', 'file2': u'pyoptemplate.t', 'pyoptemplate': u'', 'pyoptemplate.sql': u'', 'sql': u'pyoptemplate', 'table': u'TEMP_TABLE'}, u'test_mode': True, u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'tomorrow_ds': '2017-01-19', u'tomorrow_ds_nodash': u'20170119', u'ts': '2017-01-18T00:00:00', u'ts_nodash': u'20170118T000000', u'yesterday_ds': '2017-01-17', u'yesterday_ds_nodash': u'20170117'} [2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None
Templating in Airflow works exactly the same as templating with Jinja in Python: define your to-be-evaluated code between double curly braces, and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.
template_searchpath (str or list[str]) – This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default. template_undefined (jinja2.
As of Airflow 1.8, the way the PythonOperator replaces its template_ext
field in __init__
doesn't work. Tasks only check template_ext
on the __class__
. To create a PythonOperator that picks up SQL template files you only need to do the following:
class SQLTemplatedPythonOperator(PythonOperator): template_ext = ('.sql',)
And then to access the SQL from your task when it runs:
SQLTemplatedPythonOperator( templates_dict={'query': 'my_template.sql'}, params={'my_var': 'my_value'}, python_callable=my_func, provide_context=True, ) def my_func(**context): context['templates_dict']['query']
Recently I came across the same issue and finally solved it. @Ardan 's solution is correct but just want to repeat with a more complete answer with some details in how Airflow works for the newcomers.
Of course you first need one of this:
from airflow.operators.python_operator import PythonOperator class SQLTemplatedPythonOperator(PythonOperator): # somehow ('.sql',) doesn't work but tuple of two works... template_ext = ('.sql','.abcdefg')
Assuming you have a sql template file like below:
# stored at path: $AIRFLOW_HOME/sql/some.sql select {{some_params}} from my_table;
First make sure you add your folder to the search path in your dag params.
Do not pass template_searchpath to args and then pass args to DAG!!!! It doesn't work.
dag = DAG( dag_id= "some_name", default_args=args, schedule_interval="@once", template_searchpath='/Users/your_name/some_path/airflow_home/sql' )
Then your operator call will be
SQLTemplatedPythonOperator( templates_dict={'query': 'some.sql'}, op_kwargs={"args_directly_passed_to_your_function": "some_value"}, task_id='dummy', params={"some_params":"some_value"}, python_callable=your_func, provide_context=True, dag=dag, )
Your function will be:
def your_func(args_directly_passed_to_your_function=None): query = context['templates_dict']['query'] dome_some_thing(query)
Some explanations:
Airflow uses values from the context to render your template. To manually add it to the context, you can use the params field like above.
PythonOperator does not take template file extension from the template_ext field any more like @Ardan mentioned. The source code is here. It only takes extension from self.__class__.template_ext.
Airflow loops through the template_dict field and if value.endswith(file_extension) == True, then it renders the template.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With