Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Python Script with execution_date in op_kwargs

Tags:

python

airflow

With assistance from this answer https://stackoverflow.com/a/41730510/4200352 I am executing a python file.

I use PythonOperator and am trying to include the execution date as an argument passed to the script.

I believe I can access it somehow through kwargs['execution_date'].

The below fails

DAG.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

import sys
import os
sys.path.append(os.path.abspath("/home/glsam/OmegaAPI/airflow/scripts/PyPer_ogi_simple"))
from update_benchmarks import *


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 23),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('run_pyPer', default_args=default_args)

update_BM_G027 = PythonOperator(
    task_id='update_BM_G027',
    python_callable=update_bmk,
    dag=dag,
    op_kwargs={
        'bmk_code': 'G027',
        'is_hedged': False,
        'from_date': kwargs['execution_date'],
    })

Do maybe i need to use this answer to get the date then XCOM it to the task? https://stackoverflow.com/a/36754930/4200352

like image 534
Glenn Sampson Avatar asked Apr 30 '18 03:04

Glenn Sampson


People also ask

What is Op_kwargs in Airflow?

op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.

Can Airflow run Python script?

Now to schedule Python scripts with Apache Airflow, open up the dags folder where your Airflow is installed or create a folder called “dags” in there. This will be the place where all your dags, or, python scripts will be. Once you have it, create a file in there ending with a .


1 Answers

This is really a bit confusing and not very well documented.

You are already using the PythonOperator.

Now just add the option

provide_context=True,

and extend your callable with a pointer, e.g.

update_bmk(bmk_code, is_hedged, **context)

Now, within your function you will have access to all information about the task, including the execution date like so:

task_instance = context['task_instance']
execution_date = context['execution_date']

To see a full reference of items in the context, see https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

Those are the docs for macros, but you can use the items in the context dictionary.

like image 154
tobi6 Avatar answered Oct 03 '22 22:10

tobi6