I am trying to add custom filters for my airflow jinja2 templates.
Since my folders in S3 are like
/year/month/day/
, my purpose is to use yesterday_ds in the Variables screen like this:
s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/
I have seen in a PR (which I think is already merged..) that you can do this with the parameter 'user_defined_filters' in the dag_args parameter in the dag object creation here
The problem is that even when doing it, it says 'no filter named get_year', for example.
This is my code:
dag.py
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['[email protected]']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
Does anybody see where I am mistaken? Thank you!
EDIT
Printing this after the dag definition, shows no custom filters, unfortunately :(.
jinja_env = dag.get_template_env()
print(jinja_env.filters)
Also, if I try to add it directly as a DAG object parameter, as it shows in the tests @ tests/models.py:
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
EDIT 2
Ok what I see is that I have the version 1.8.0 and this one does not have the filters. Anybody knows how to download the 1.8.2rc one via pip? Or we cant?
Airflow has support for custom filters and macros now
Working code example:
from airflow import DAG
from datetime import datetime, timedelta
def first_day_of_month(any_day):
return any_day.replace(day=1)
def last_day_of_month(any_day):
next_month = any_day.replace(day=28) + timedelta(days=4) # this will never fail
return next_month - timedelta(days=next_month.day)
def isoformat_month(any_date):
return any_date.strftime("%Y-%m")
with DAG(
dag_id='generate_raw_logs',
default_args=default_args,
schedule_interval=timedelta(minutes=120),
catchup=False,
user_defined_macros={
'first_day_of_month': first_day_of_month,
'last_day_of_month': last_day_of_month,
},
user_defined_filters={
'isoformat_month': isoformat_month
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With