With assistance from this answer https://stackoverflow.com/a/41730510/4200352 I am executing a python file.
I use PythonOperator and am trying to include the execution date as an argument passed to the script.
I believe I can access it somehow through kwargs['execution_date'].
The below fails
DAG.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
import os
sys.path.append(os.path.abspath("/home/glsam/OmegaAPI/airflow/scripts/PyPer_ogi_simple"))
from update_benchmarks import *
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 4, 23),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('run_pyPer', default_args=default_args)
update_BM_G027 = PythonOperator(
task_id='update_BM_G027',
python_callable=update_bmk,
dag=dag,
op_kwargs={
'bmk_code': 'G027',
'is_hedged': False,
'from_date': kwargs['execution_date'],
})
Do maybe i need to use this answer to get the date then XCOM it to the task? https://stackoverflow.com/a/36754930/4200352
op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
Now to schedule Python scripts with Apache Airflow, open up the dags folder where your Airflow is installed or create a folder called “dags” in there. This will be the place where all your dags, or, python scripts will be. Once you have it, create a file in there ending with a .
This is really a bit confusing and not very well documented.
You are already using the PythonOperator
.
Now just add the option
provide_context=True,
and extend your callable with a pointer, e.g.
update_bmk(bmk_code, is_hedged, **context)
Now, within your function you will have access to all information about the task, including the execution date like so:
task_instance = context['task_instance']
execution_date = context['execution_date']
To see a full reference of items in the context, see https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
Those are the docs for macros, but you can use the items in the context dictionary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With