Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retrieve default args in python callable

Tags:

airflow

I need to be able to access default_args defined as part of DAG definition in a Python Operator, python_callable. Maybe it's my unfamiliartiy with python or airflow in general, but could someone guide on how to achieve this.

Following is a code sample of what am trying to achieve

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': '[email protected]',
    'email_on_failure': '[email protected]',
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2017, 5, 15, 23, 20),
    'end_date': datetime(2017, 5, 16, 23, 45),
    'touchfile_path': '/user/myname/touchfiles/',
}

dag = DAG(
    'test',
    default_args=default_args,
    template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
    user_defined_macros=dict(SCHEMA_NAME='abc'),
    #schedule_interval='*/2 * * * * ')
    schedule_interval='@once')

def webhdfs_touchfile_create(ds, *args, **kwargs):
    web_hdfs_hook = WebHDFSHook('webhdfs_default')
    client = web_hdfs_hook.get_conn()
    client.write("/user/myname/airflow_hdfs","stringToWrite")
    pp.pprint(kwargs)

task1 = PythonOperator(
    task_id='task1',
    provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
    python_callable=webhdfs_touchfile_create,
    templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
    dag=dag)

Since the template_dict for PythonOperator is the only attribute which jinja templating works, how can i retrieve the 'touchfile_path' paramter in there?

like image 934
Saurabh Mishra Avatar asked May 16 '17 05:05

Saurabh Mishra


People also ask

What is Python callable in Airflow?

python_callable (python callable) – a reference to an object. op_kwargs – a dictionary of keyword arguments that will unpack in the function. op_args – it is a list of positional arguments that will be unpacked when calling the callable.

What is PythonOperator?

The PythonOperator in Airflow is responsible for running any Python code. Just like the BashOperator used before, this and all other operators require a task_id . The task_id is referenced when running a task and displayed in the UI.

What is Op_kwargs?

op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.

How do you find the context of Airflow?

For Airflow context variables make sure that Airflow is also installed as part of the virtualenv environment in the same version as the Airflow version the task is run on. Otherwise you won't have access to the most context variables of Airflow in op_kwargs .


2 Answers

There are 2 mechanisms for passing variables in Airflow:

  • (1) Jinja templating
  • (2) Specialized operator properties

Using (1) approach variables can be passed via user_defined_macros property on the DAG level. Using (2) approach you should take a look on specific operator properties.

Note, that some operator properties are processed by Jinja and you can use template syntax.

Here is a working example:

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'custom_key1': 'custom_value1',
    'custom_key2': 'custom_value2'
}

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    user_defined_macros=default_args ,
)

bash_command = """
    echo "access via DAG's user_defined_macros = {{ custom_key1 }}"
    echo "access via Operator's params = {{ params.custom_key2 }}"
"""

t1 = BashOperator(
    task_id='print_in_bash_op',
    bash_command=bash_command,
    params=default_args,
    dag=dag,
)

def myfunc(**context):
    print(context['templates_dict']['custom_key1'])
    print(context['templates_dict']['custom_key2'])


t2 = PythonOperator(
    task_id='print_in_python_op',
    python_callable=myfunc, 
    templates_dict=default_args,
    provide_context=True,
    dag=dag,
)

templates_dict={
    'custom_key1': '{{ custom_key1 }}',
    'custom_key2': '{{ custom_key2 }}'
}

t3 = PythonOperator(
    task_id='print_in_python_op_2',
    python_callable=myfunc, 
    templates_dict=templates_dict,
    provide_context=True,
    dag=dag,
)

t1 >> t2 >> t3

Addition based on comments

Ability of using variables is fully depends on Operator.

In (2) approach typically there are specialized properties for passing information like:

  • bash_command in BashOperator,
  • op_kwargs in PythonOperator,
  • sql in BigQueryOperator

For using approach (1) such properties should be rendered using jinja in Operator code (they are marked as templated in documentation). For instance, properties above are templated properties.

In every place where Airflow Macros can be used, user variables (defined via user_defined_macros) can be also used.

like image 129
Ilya Bystrov Avatar answered Oct 17 '22 04:10

Ilya Bystrov


In Airflow 2.0, TaskFlow means a "Python callable" is sometimes just a function annotated with @task. In this case, you can retrieve the default args from the context:

from airflow.operators.python import get_current_context

    @task
    def my_task():
        context = get_current_context()
        email_on_failure = context["dag"].default_args["email_on_failure"]
like image 36
Noumenon Avatar answered Oct 17 '22 05:10

Noumenon