I'd like to pass in a list of values or indeed any value as an argument to a custom Operator, modify the value(s) in the operator, then access those values in a sql template via the {{ params }}
macro.
Here are the relevant parts of my setup, slightly contrived for clarity.
The DAG definition:
from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 27),
'provide_context': True,
'depends_on_past': True
}
dag = DAG(
'etl',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1)
process_product_dim = ProcessDimensionOperator(
task_id='process_product_dim',
mysql_conn_id='mysql_dwh',
sql='process_dimension.sql',
database='dwh',
col_names=[
'id',
'name',
'category',
'price',
'available',
'country',
],
t_name='products',
dag=dag)
The Operator definition:
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class ProcessDimensionOperator(BaseOperator):
template_fields = (
'sql',
'parameters')
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
sql,
t_name,
col_names,
database,
mysql_conn_id='mysql_default',
*args, **kwargs):
super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.t_name = t_name
self.col_names = col_names
self.database = database
self.mysql_conn_id = mysql_conn_id
self.parameters = parameters
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
self.params['col_names'] = self.col_names
self.params['t_name'] = self.t_name
self.params['match_statement'] = self.construct_match_statement(self.col_names)
hook.run(sql=self.sql)
def construct_match_statement(self, cols):
map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])
return ' '.join(map_list)
process_dimension.sql
create table if not exists staging.{{ params.t_name }};
select
*
from
source.{{ params.t_name }} as source
join
target.{{ params.t_name }} as target
on source.id = target.id {{ params.match_statement }}
But this throws errors since {{ params.t_name }}
and {{ params.match_statement}}
are rendering as null.
t_name
and c_name
in the params
argument at task definition and leaving the map/join logic in the sql template. This works but I'd like to keep logic out of the templates if possibleparams={xxx}
into the super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
hook.run()
method as parameters={xxx}
and templating them with %(x)s
but this causes issues as it renders with quotation marks around the variables which messes up various sql statementsI'm quite new to python and airflow so I may well be missing something obvious, any help would be greatly appreciated, thanks!
Same here. I've just spent a few hours (days?) to find out the cause of the problem (god save IPython.embed and logging). As of Airflow 1.10.3, it's caused by TaskInstance.render_templates(), which won't update the Jinja context, only the task attibute, after rendering any of the template_fields or template_exts. See it here!
Therefore you just have to use
{{ task.params.whatever }}
instead of
{{ params.whatever }}
In your .sql template files.
As a matter of fact, if Jinja context would be updated continuously, then one would really have to pay attention to the order and dependencies of the templates. It's kind of a nested/recursive rendering. It could also have performance downsides.
Also, I would not recommend to use "parameters" (which is not the same as "params"), as they seem to be intended to be passed to database cursors as parameters, and then you won't be able to pass numbers/integers, column or table names, or simply an SQL fragment (e.g. where, having, limit, ...).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With