i want to execute a function with one parameter that I pass from a task.
Here's my function with state parameter:
def sns_notify(state):
client = boto3.client('sns')
if state == "failed":
message = config.get('sns', 'message') + state
else:
message = config.get('sns', 'message') + state
response = client.publish(TargetArn=config.get('sns', 'target_arn'),
Message=message,
Subject=config.get('sns', 'subject'))
return response
Here's my tasks with state as param:
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
on_success_callback=sns_notify("ok"), dag=dag)
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
on_success_callback=sns_notify("failed"), dag=dag)
When i run the dag the function doesn't stop sending mails (for this exemple)
Every time the DAG is loaded by airflow it will execute sns_notify("ok")
because you are calling the function. You need to instead pass just the function pointer sns_notify
, which will receive context
. See docs: https://airflow.apache.org/code.html
trigger_rule
relates to how dependencies tasks are executed so is not relevant to on_success_callback
.
I'm not sure how to pass variables to this callback however - came here looking for answers!
Hoju pointed out the exact error.
you can use functional programming to help solve this issue.
from functools import partial
send_success_notification = partial(sns_notify, "OK")
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
on_success_callback=send_success_notification , dag=dag)
send_failure_notification = partial(sns_notify, "FAILED")
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
on_success_callback=send_failure_notification, dag=dag)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With