Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Make custom Airflow macros expand other macros

Is there any way to make a user-defined macro in Airflow which is itself computed from other macros?

from airflow import DAG from airflow.operators.bash_operator import BashOperator  dag = DAG(     'simple',     schedule_interval='0 21 * * *',     user_defined_macros={         'next_execution_date': '{{ dag.following_schedule(execution_date) }}',     }, )  task = BashOperator(     task_id='bash_op',     bash_command='echo "{{ next_execution_date }}"',     dag=dag, ) 

The use case here is to back-port the new Airflow v1.8 next_execution_date macro to work in Airflow v1.7. Unfortunately, this template is rendered without macro expansion:

$ airflow render simple bash_op 2017-08-09 21:00:00     # ----------------------------------------------------------     # property: bash_command     # ----------------------------------------------------------     echo "{{ dag.following_schedule(execution_date) }}" 
like image 591
mxxk Avatar asked Jun 30 '17 23:06

mxxk


People also ask

Is it possible to create a custom operator in Apache Airflow?

Airflow allows you to create new operators to suit the requirements of you or your team. The extensibility is one of the many reasons which makes Apache Airflow powerful. There are two methods that you need to override in a derived class: Constructor - Define the parameters required for the operator.

What does DS mean in Airflow?

Customer Question: On daily tasks, using ds (an Airflow Variable that allows you to specify execution date) makes sense because we need to process the data of the previous day.

What is Jinja template in Airflow?

Templating in Airflow works exactly the same as templating with Jinja in Python: define your to-be-evaluated code between double curly braces, and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.

How to add custom macros to the airflow engine?

Additional custom macros can be added globally through ORM Extensions, or at a DAG level through the DAG.user_defined_macros argument. The Airflow engine passes a few variables by default that are accessible in all templates Description the execution date as YYYY-MM-DD the execution date as YYYYMMDD

How to add custom macros to an existing template?

Additional custom macros can be added globally through ORM Extensions, or at a DAG level through the DAG.user_defined_macros argument. The Airflow engine passes a few variables by default that are accessible in all templates Description the execution date as YYYY-MM-DD

What are templates and macros in Apache Airflow?

Templates and macros in Apache Airflow are really powerful to make your tasks dynamic and idempotent when you need time as input. You can’t hard code a date as the task won’t work anymore if you want to run it in the past or in the future. This date must be dynamic and must change according to when the DAG is executed.

How to debug a macro expansion that is not working?

You should use the airflow render command or check the rendered view in the task instance details of the UI when debugging issues like this. It would likely show you what's going wrong with your assumed macro expansions.


2 Answers

Here are some solutions:

1. Override BashOperator to add some values to the context

class NextExecutionDateAwareBashOperator(BashOperator):     def render_template(self, attr, content, context):         dag = context['dag']         execution_date = context['execution_date']         context['next_execution_date'] = dag.following_schedule(execution_date)          return super().render_templates(attr, content, context)         # or in python 2:         # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context) 

The good part with this approach: you can capture some repeated code in your custom operator.

The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.

2. Do your computation in a user defined macro

Macros are not necessarily values. They can be functions.

In your dag :

def compute_next_execution_date(dag, execution_date):     return dag.following_schedule(execution_date)  dag = DAG(     'simple',     schedule_interval='0 21 * * *',     user_defined_macros={         'next_execution_date': compute_next_execution_date,     }, )  task = BashOperator(     task_id='bash_op',     bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',     dag=dag, ) 

The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc...), and make your function result available to render a template.

The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.

3. Call your statement directly in your template

This solution is the simplest (as mentioned by Ardan's answer), and probably the good one in your case.

BashOperator(     task_id='bash_op',     bash_command='echo "{{ dag.following_schedule(execution_date) }}"',     dag=dag, ) 

Ideal for simple calls like this one. And they are some other objects directly available as macros (like task, task_instance, etc...); even some standard modules are available (like macros.time, ...).

like image 57
Géraud Avatar answered Sep 22 '22 03:09

Géraud


I would vote for making Airflow Plugin to inject your pre-defined macros. Using this method, you can use your pre-defined macro in any Operator without declare anything.

Below are some custom macros that we're using. Example using: {{ macros.dagtz_next_execution_date(ti) }}

from airflow.plugins_manager import AirflowPlugin from datetime import datetime, timedelta from airflow.utils.db import provide_session from airflow.models import DagRun import pendulum   @provide_session def _get_dag_run(ti, session=None):     """Get DagRun obj of the TaskInstance ti      Args:         ti (TYPE): the TaskInstance object         session (None, optional): Not in use      Returns:         DagRun obj: the DagRun obj of the TaskInstance ti     """     task = ti.task     dag_run = None     if hasattr(task, 'dag'):         dag_run = (             session.query(DagRun)             .filter_by(                 dag_id=task.dag.dag_id,                 execution_date=ti.execution_date)             .first()         )         session.expunge_all()         session.commit()     return dag_run   def ds_add_no_dash(ds, days):     """     Add or subtract days from a YYYYMMDD     :param ds: anchor date in ``YYYYMMDD`` format to add to     :type ds: str     :param days: number of days to add to the ds, you can use negative values     :type days: int     >>> ds_add('20150101', 5)     '20150106'     >>> ds_add('20150106', -5)     '20150101'     """      ds = datetime.strptime(ds, '%Y%m%d')     if days:         ds = ds + timedelta(days)     return ds.isoformat()[:10].replace('-', '')   def dagtz_execution_date(ti):     """get the TaskInstance execution date (in DAG timezone) in pendulum obj      Args:         ti (TaskInstance): the TaskInstance object      Returns:         pendulum obj: execution_date in pendulum object (in DAG tz)     """     execution_date_pdl = pendulum.instance(ti.execution_date)     dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)     return dagtz_execution_date_pdl   def dagtz_next_execution_date(ti):     """get the TaskInstance next execution date (in DAG timezone) in pendulum obj      Args:         ti (TaskInstance): the TaskInstance object      Returns:         pendulum obj: next execution_date in pendulum object (in DAG tz)     """      # For manually triggered dagruns that aren't run on a schedule, next/previous     # schedule dates don't make sense, and should be set to execution date for     # consistency with how execution_date is set for manually triggered tasks, i.e.     # triggered_date == execution_date.     dag_run = _get_dag_run(ti)     if dag_run and dag_run.external_trigger:         next_execution_date = ti.execution_date     else:         next_execution_date = ti.task.dag.following_schedule(ti.execution_date)      next_execution_date_pdl = pendulum.instance(next_execution_date)     dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)     return dagtz_next_execution_date_pdl   def dagtz_next_ds(ti):     """get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string     """     dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)     return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')   def dagtz_next_ds_nodash(ti):     """get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string     """     dagtz_next_ds_str = dagtz_next_ds(ti)     return dagtz_next_ds_str.replace('-', '')   def dagtz_prev_execution_date(ti):     """get the TaskInstance previous execution date (in DAG timezone) in pendulum obj      Args:         ti (TaskInstance): the TaskInstance object      Returns:         pendulum obj: previous execution_date in pendulum object (in DAG tz)     """      # For manually triggered dagruns that aren't run on a schedule, next/previous     # schedule dates don't make sense, and should be set to execution date for     # consistency with how execution_date is set for manually triggered tasks, i.e.     # triggered_date == execution_date.     dag_run = _get_dag_run(ti)     if dag_run and dag_run.external_trigger:         prev_execution_date = ti.execution_date     else:         prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)      prev_execution_date_pdl = pendulum.instance(prev_execution_date)     dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)     return dagtz_prev_execution_date_pdl   def dagtz_prev_ds(ti):     """get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string     """     dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)     return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')   def dagtz_prev_ds_nodash(ti):     """get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string     """     dagtz_prev_ds_str = dagtz_prev_ds(ti)     return dagtz_prev_ds_str.replace('-', '')   # Defining the plugin class class AirflowTestPlugin(AirflowPlugin):     name = "custom_macros"     macros = [dagtz_execution_date, ds_add_no_dash,               dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,               dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash] 
like image 20
z1k Avatar answered Sep 24 '22 03:09

z1k