Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TemplateNotFound when using Airflow's PostgresOperator with Jinja templating and SQL

When trying to use Airflow's templating capabilities (via Jinja2) with the PostgresOperator, I've been unable to get things to render. It's quite possible I'm doing something wrong, but I'm pretty lost as to what the issue might be. Here's an example to reproduce the TemplateNotFound error I've been getting:

airflow.cfg

airflow_home = /home/gregreda/airflow
dags_folder = /home/gregreda/airflow/dags

relevant DAG and variables

default_args = {
    'owner': 'gregreda',
    'start_date': datetime(2016, 6, 1),
    'schedule_interval': None,
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift'
dag = DAG(
    dag_id='example_csv_to_redshift',
    schedule_interval=None,
    default_args=default_args
)

/example_csv_to_redshift/csv_to_redshift.py

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql=this_dag_path + '/copy_to_redshift.sql',
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

/example_csv_to_redshift/copy_to_redshift.sql

COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv'
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;

Calling airflow render example_csv_to_redshift load_table 2016-06-14 throws the exception below. Note I'm running into this issue for another DAG as well, which is why you see the path with example_redshift_query_to_csv mentioned.

[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
    setattr(self, attr, env.loader.get_source(env, content)[0])
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
    setattr(self, attr, env.loader.get_source(env, content)[0])
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render
    ti.render_templates()
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates
    rendered_content = rt(attr, content, jinja_context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template
    cache_key = self.loader.get_source(self, name)[1]
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql

Any ideas towards a fix are much appreciated.

like image 449
Greg Reda Avatar asked Jun 14 '16 21:06

Greg Reda


People also ask

How does Jinja templating work?

It is a text-based template language and thus can be used to generate any markup as well as source code. The Jinja template engine allows customization of tags, filters, tests, and globals. Also, unlike the Django template engine, Jinja allows the template designer to call functions with arguments on objects.

What is SQL templating?

Templating is the ability to add programmatic capabilities to the SQL Language. Preset's SQL Lab supports query templating via the Jinja Framework, which is a web template engine for the Python programing Language.

What is Jinja template in airflow?

Templating in Airflow works exactly the same as templating with Jinja in Python: define your to-be-evaluated code between double curly braces, and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.

What is Jinja SQL?

JinjaSQL is a template language for SQL statements and scripts. Since it's based in Jinja2, you have all the power it offers - conditional statements, macros, looping constructs, blocks, inheritance, and many more. JinjaSQL automatically binds parameters that are inserted into the template.


2 Answers

For a bit more control, instantiate your DAG with the template_searchpath param, then just use the filename in the operator.

:param template_searchpath: This list of folders (non relative)
    defines where jinja will look for your templates. Order matters.
    Note that jinja/airflow includes the path of your DAG file by
    default
:type template_searchpath: string or list of stings

As @yannicksse suggested, applying this practice to your original dag would look like this:

dag = DAG(
    dag_id='example_csv_to_redshift',
    schedule_interval=None,
    template_searchpath=[this_dag_path]  # here
    default_args=default_args
)

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql='copy_to_redshift.sql',  # and here
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

although, personally, I'd put all the templates in a subfolder

like image 82
marengaz Avatar answered Oct 09 '22 04:10

marengaz


Standard PEBCAK error.

There was an issue specifying the path to the SQL template within the given Airflow task, which needed to be relative.

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql='/copy_to_redshift.sql',
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

Additionally, the SQL template needed to be changed slightly (note the params. ... this time):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv'
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
like image 25
Greg Reda Avatar answered Oct 09 '22 02:10

Greg Reda