I need to be able to access default_args defined as part of DAG definition in a Python Operator, python_callable. Maybe it's my unfamiliartiy with python or airflow in general, but could someone guide on how to achieve this.
Following is a code sample of what am trying to achieve
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': '[email protected]',
'email_on_failure': '[email protected]',
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2017, 5, 15, 23, 20),
'end_date': datetime(2017, 5, 16, 23, 45),
'touchfile_path': '/user/myname/touchfiles/',
}
dag = DAG(
'test',
default_args=default_args,
template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
user_defined_macros=dict(SCHEMA_NAME='abc'),
#schedule_interval='*/2 * * * * ')
schedule_interval='@once')
def webhdfs_touchfile_create(ds, *args, **kwargs):
web_hdfs_hook = WebHDFSHook('webhdfs_default')
client = web_hdfs_hook.get_conn()
client.write("/user/myname/airflow_hdfs","stringToWrite")
pp.pprint(kwargs)
task1 = PythonOperator(
task_id='task1',
provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
python_callable=webhdfs_touchfile_create,
templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
dag=dag)
Since the template_dict for PythonOperator is the only attribute which jinja templating works, how can i retrieve the 'touchfile_path' paramter in there?
python_callable (python callable) – a reference to an object. op_kwargs – a dictionary of keyword arguments that will unpack in the function. op_args – it is a list of positional arguments that will be unpacked when calling the callable.
The PythonOperator in Airflow is responsible for running any Python code. Just like the BashOperator used before, this and all other operators require a task_id . The task_id is referenced when running a task and displayed in the UI.
op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
For Airflow context variables make sure that Airflow is also installed as part of the virtualenv environment in the same version as the Airflow version the task is run on. Otherwise you won't have access to the most context variables of Airflow in op_kwargs .
There are 2 mechanisms for passing variables in Airflow:
Using (1) approach variables can be passed via user_defined_macros
property on the DAG level.
Using (2) approach you should take a look on specific operator properties.
Note, that some operator properties are processed by Jinja and you can use template syntax.
Here is a working example:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'custom_key1': 'custom_value1',
'custom_key2': 'custom_value2'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
user_defined_macros=default_args ,
)
bash_command = """
echo "access via DAG's user_defined_macros = {{ custom_key1 }}"
echo "access via Operator's params = {{ params.custom_key2 }}"
"""
t1 = BashOperator(
task_id='print_in_bash_op',
bash_command=bash_command,
params=default_args,
dag=dag,
)
def myfunc(**context):
print(context['templates_dict']['custom_key1'])
print(context['templates_dict']['custom_key2'])
t2 = PythonOperator(
task_id='print_in_python_op',
python_callable=myfunc,
templates_dict=default_args,
provide_context=True,
dag=dag,
)
templates_dict={
'custom_key1': '{{ custom_key1 }}',
'custom_key2': '{{ custom_key2 }}'
}
t3 = PythonOperator(
task_id='print_in_python_op_2',
python_callable=myfunc,
templates_dict=templates_dict,
provide_context=True,
dag=dag,
)
t1 >> t2 >> t3
Addition based on comments
Ability of using variables is fully depends on Operator.
In (2) approach typically there are specialized properties for passing information like:
For using approach (1) such properties should be rendered using jinja in Operator code (they are marked as templated
in documentation).
For instance, properties above are templated
properties.
In every place where Airflow Macros
can be used, user variables (defined via user_defined_macros
) can be also used.
In Airflow 2.0, TaskFlow means a "Python callable" is sometimes just a function annotated with @task. In this case, you can retrieve the default args from the context:
from airflow.operators.python import get_current_context
@task
def my_task():
context = get_current_context()
email_on_failure = context["dag"].default_args["email_on_failure"]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With