Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow custom jinja2 filters

I am trying to add custom filters for my airflow jinja2 templates.

Since my folders in S3 are like

/year/month/day/

, my purpose is to use yesterday_ds in the Variables screen like this:

s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/

I have seen in a PR (which I think is already merged..) that you can do this with the parameter 'user_defined_filters' in the dag_args parameter in the dag object creation here

The problem is that even when doing it, it says 'no filter named get_year', for example.

This is my code:

dag.py

   dag = DAG(
        dag_id='dag-name',
        default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
        template_searchpath=tmpl_search_path,
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
        )

utils.py

def get_dag_args(**kwargs):
return {
    'owner'               : kwargs.get('owner', 'owner_name'),
    'depends_on_past'     : kwargs.get('depends_on_past', False),
    'start_date'          : kwargs.get('start_date', datetime.now() - timedelta(1)),
    'email'               : kwargs.get('email', ['[email protected]']),
    'retries'             : kwargs.get('retries', 5),
    'provide_context'     : kwargs.get('provide_context', True),
    'retry_delay'         : kwargs.get('retry_delay', timedelta(minutes=5)),
    'user_defined_filters': get_date_filters()
    }


def get_date_filters():
    return dict(
        get_year=lambda date_string: date_string.strftime('%Y'),
        get_month=lambda date_string: date_string.strftime('%m'),
        get_day=lambda date_string: date_string.strftime('%d'),
        )

Does anybody see where I am mistaken? Thank you!

EDIT

Printing this after the dag definition, shows no custom filters, unfortunately :(.

jinja_env = dag.get_template_env()
print(jinja_env.filters)

Also, if I try to add it directly as a DAG object parameter, as it shows in the tests @ tests/models.py:

Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'

EDIT 2

Ok what I see is that I have the version 1.8.0 and this one does not have the filters. Anybody knows how to download the 1.8.2rc one via pip? Or we cant?

like image 526
Alberto C. Avatar asked Aug 09 '17 08:08

Alberto C.


1 Answers

Airflow has support for custom filters and macros now

Working code example:

from airflow import DAG
from datetime import datetime, timedelta

def first_day_of_month(any_day):
    return any_day.replace(day=1)


def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + timedelta(days=4)  # this will never fail
    return next_month - timedelta(days=next_month.day)


def isoformat_month(any_date):
    return any_date.strftime("%Y-%m")


with DAG(
        dag_id='generate_raw_logs',
        default_args=default_args,
        schedule_interval=timedelta(minutes=120),
        catchup=False,
        user_defined_macros={
            'first_day_of_month': first_day_of_month,
            'last_day_of_month': last_day_of_month,
        },
        user_defined_filters={
            'isoformat_month': isoformat_month
        }
)
like image 162
mik-laj Avatar answered Oct 07 '22 19:10

mik-laj