I'd like to pass other arguments to my on_failure_callback function but it only seems to want "context". How do I pass other arguments to that function...especially since I'd like to define that function in a separate module so it can be used in all my DAGS.
My current default_args looks like this:
default_args = {
'owner': 'Me',
'depends_on_past': True,
'start_date': datetime(2016,01,01),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': notify_failure,
'max_active_runs': 1
}
If I try something like this airflow complains:
default_args = {
'owner': 'Me',
'depends_on_past': True,
'start_date': datetime(2016,01,01),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': notify_failure(context,arg1,arg2),
'max_active_runs': 1
}
so not sure how to pass arg1 and arg2 to my notify_failure fuction that I would like to define in a separate module that I can simply import into my DAG
Assuming the args are something you can define at the DAG level, then you can use the partials package. ie:
from functools import partial
def generic_failure(arg1, arg2, context):
# do whatever
default_args = {
'owner': 'Me',
'depends_on_past': True,
'start_date': datetime(2016,01,01),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': partial(generic_failure, arg1, arg2),
'max_active_runs': 1
}
Calling partial(generic_failure, arg1, arg2)
will return a function expecting however many arguments are remaining in generic_failure
, which in the above example is just the single param context
you can use a nested function for this
def generic_failure(arg1, arg2):
def failure(context):
message = 'we have a function that failed witg args : {ARG1}, {ARG2}'.format(ARG1=arg1,ARG2=arg2)
print(message)
return message
return failure
arg1 = 'arg1'
arg2 = 'arg2'
default_args = {
'owner': 'Me',
'on_failure_callback': generic_failure(arg1, arg2),
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With