Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access to the params argument in a custom operator in Apache Airflow

The Problem

I'd like to pass in a list of values or indeed any value as an argument to a custom Operator, modify the value(s) in the operator, then access those values in a sql template via the {{ params }} macro.

Current Setup

Here are the relevant parts of my setup, slightly contrived for clarity.

The DAG definition:

from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 2, 27),
    'provide_context': True,
    'depends_on_past': True
}

dag = DAG(
    'etl',
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60),
    template_searchpath=tmpl_search_path,
    default_args=default_args,
    max_active_runs=1)

process_product_dim = ProcessDimensionOperator(
    task_id='process_product_dim',
    mysql_conn_id='mysql_dwh',
    sql='process_dimension.sql',
    database='dwh',
    col_names=[
        'id',
        'name',
        'category',
        'price',
        'available',
        'country',
    ],
    t_name='products',
    dag=dag)

The Operator definition:

from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class ProcessDimensionOperator(BaseOperator):
    template_fields = (
        'sql',
        'parameters')
    template_ext = ('.sql',)

    @apply_defaults
    def __init__(
            self,
            sql,
            t_name,
            col_names,
            database,
            mysql_conn_id='mysql_default',
            *args, **kwargs):
        super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.t_name = t_name
        self.col_names = col_names
        self.database = database
        self.mysql_conn_id = mysql_conn_id
        self.parameters = parameters

    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)

        self.params['col_names'] = self.col_names
        self.params['t_name'] = self.t_name
        self.params['match_statement'] = self.construct_match_statement(self.col_names)

        hook.run(sql=self.sql)

    def construct_match_statement(self, cols):
        map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])

        return ' '.join(map_list)

process_dimension.sql

create table if not exists staging.{{ params.t_name }};

select
    *
from
    source.{{ params.t_name }} as source
join
    target.{{ params.t_name }} as target
    on source.id = target.id {{ params.match_statement }}

But this throws errors since {{ params.t_name }} and {{ params.match_statement}} are rendering as null.

What I've tried

  • Setting the t_name and c_name in the params argument at task definition and leaving the map/join logic in the sql template. This works but I'd like to keep logic out of the templates if possible
  • Passing in params={xxx} into the super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
  • Passing in params into the hook.run() method as parameters={xxx} and templating them with %(x)s but this causes issues as it renders with quotation marks around the variables which messes up various sql statements

I'm quite new to python and airflow so I may well be missing something obvious, any help would be greatly appreciated, thanks!

like image 641
dbatten Avatar asked Oct 16 '22 06:10

dbatten


1 Answers

Same here. I've just spent a few hours (days?) to find out the cause of the problem (god save IPython.embed and logging). As of Airflow 1.10.3, it's caused by TaskInstance.render_templates(), which won't update the Jinja context, only the task attibute, after rendering any of the template_fields or template_exts. See it here!

Therefore you just have to use

{{ task.params.whatever }}

instead of

{{ params.whatever }}

In your .sql template files.

As a matter of fact, if Jinja context would be updated continuously, then one would really have to pay attention to the order and dependencies of the templates. It's kind of a nested/recursive rendering. It could also have performance downsides.

Also, I would not recommend to use "parameters" (which is not the same as "params"), as they seem to be intended to be passed to database cursors as parameters, and then you won't be able to pass numbers/integers, column or table names, or simply an SQL fragment (e.g. where, having, limit, ...).

like image 108
Andor Avatar answered Oct 20 '22 10:10

Andor