Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding params as a template_fields in BigQueryOperator

Tags:

airflow

I am trying to template the params field in a Bigquery operator as below.

t3 = MyBigQueryOperator(
    task_id='autoplay_calc',
    bql='autoplay_calc.sql',
    params={
            "env" : deployment
            ,"region" : region
            ,"partition_start_date" : '{{ macros.ds_add(ds, -1) }}'
            },
    bigquery_conn_id='gcp_conn',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    provide_context=True,
    destination_dataset_table=reporting_project + '.pa_reporting_public_batch.autoplay_calc',
    dag=dag
    )

I realise that params is not a templated field hence I extended the Bigqueryoperator as below to make this a templated field.

class MyBigQueryOperator(BigQueryOperator):
    template_fields = ('params', 'bql', 'destination_dataset_table')

However when I run the code seems it's not converting the params field as I receive the bellow error message

Could not cast literal "{{ macros.ds_add(ds, -1) }}
like image 772
Anthony Liu Avatar asked Apr 14 '19 15:04

Anthony Liu


2 Answers

Short answer: params does not support templating as it's a dictionary and it would require applying jinja2 to key-value pairs. You cannot add the support by just extending the template_fields attribute.

like image 192
Dalar Avatar answered Oct 20 '22 12:10

Dalar


So, the problem is that you can't simply add 'params' at the beginning of your 'template_fields', as in Airflow the task_instance doing the rendering will use the 'params' in the 'context' dictionary instead of the one you just rendered.

You have multiples ways to go around that

Overriding context

I think the class speaks for itself, I've kept the original docstring for a better explanation of the problem.

class ExtendedBigQueryOperator(BigQueryOperator):
    """
    For parameters in 'params' containing macros, those macros are not templated because 'params' is not templated.
    Example:
        operator = ExtendedBigQueryOperator(
                task_id='test',
                sql="SELECT {{ params.columns }} FROM {{ params.bq_table_id }}",
                params={
                    'columns': "*",
                    'bq_table_id': "project_id.dataset.id.table_id${{ds_nodash}}"}
            )
    Here, 'columns' does not contains macros and will be correctly templated, but 'bq_table_id' will be templated as
    'project_id.dataset.id.table_id${{ds_nodash}}' instead of 'project_id.dataset.id.table_id$20200101'
    (if ds was 2020-01-01).

    Just making 'params' a template_fields won't work, because the sql is templated using the params in the 'context'
    dictionary and not the new templated one. We need to render params and add it to the 'context' dictionary in the
    'render_template_fields' method.
    """
    def render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Environment] = None) -> None:
        """ Add the rendered 'params' to the context dictionary before running the templating """
        # Like the original method, get the env if not provided
        if not jinja_env:
            jinja_env = self.get_template_env()

        # Run the render template on params and add it to the context
        if self.params:
            context['params'] = self.render_template(self.params, context, jinja_env, set())

        # Call the original method
        super().render_template_fields(context=context, jinja_env=jinja_env)

Using .format()

This solution should not be chosen, is kept here as it was what I used originally.

A quick and dirty workaround I use is to create a custom class that will run a .format on the sql text:

class ExtendedBigQueryOperator(BigQueryOperator):
    """
    For parameters in 'params' containing macros, the macros are not templated because 'params' is not templated.
    Just making 'params' a template_fields won't work either, because the sql is templated using the parent params and not the new templated one (or something along this lines)

    So instead of using the templating from Jinja, we use the 'format' string method that will be executed at the pre_execute stage.

    This means that you can use params with macros in your sql, but using the 'format' format, in single brackets without using the 'params.' prefix.
    For example, instead of {{params.process_table}}, you would use {process_table}

    Note: I always use single brackets even for params without macro for consistency, but it is not necessary.
    """
    template_fields = ["params", *BigQueryOperator.template_fields]

    def pre_execute(self, context):
        self.sql = self.sql.format(**self.params)

Your SQL will then be exactly the same, except every variables from params should be single quoted instead of double quoted (airflow macros should be passed as arguments) and you need to remove the 'params.' prefix.

like image 34
Milo Parigi Avatar answered Oct 20 '22 13:10

Milo Parigi