Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: when on_success_callback execute a function with parameters

i want to execute a function with one parameter that I pass from a task.

Here's my function with state parameter:

def sns_notify(state):
    client = boto3.client('sns')
    if state == "failed":
        message = config.get('sns', 'message') + state
    else:
        message = config.get('sns', 'message') + state
    response = client.publish(TargetArn=config.get('sns', 'target_arn'),
                              Message=message,
                              Subject=config.get('sns', 'subject'))
    return response

Here's my tasks with state as param:

t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=sns_notify("ok"), dag=dag)

t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=sns_notify("failed"), dag=dag)

When i run the dag the function doesn't stop sending mails (for this exemple)

like image 913
Omar14 Avatar asked Mar 08 '23 20:03

Omar14


2 Answers

Every time the DAG is loaded by airflow it will execute sns_notify("ok") because you are calling the function. You need to instead pass just the function pointer sns_notify, which will receive context. See docs: https://airflow.apache.org/code.html

trigger_rule relates to how dependencies tasks are executed so is not relevant to on_success_callback.

I'm not sure how to pass variables to this callback however - came here looking for answers!

like image 135
hoju Avatar answered Apr 07 '23 00:04

hoju


Hoju pointed out the exact error.

you can use functional programming to help solve this issue.

from functools import partial
send_success_notification = partial(sns_notify, "OK")
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=send_success_notification , dag=dag)

send_failure_notification = partial(sns_notify, "FAILED")
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=send_failure_notification, dag=dag)
like image 25
Gaurang Shah Avatar answered Apr 07 '23 02:04

Gaurang Shah