Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kubeflow Pipeline Termination Notificaiton

I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler. But, seems the ExitHandler can’t dependent on any op. Do you have any good idea?

like image 516
Wenmin Wu Avatar asked Aug 15 '19 10:08

Wenmin Wu


1 Answers

I found a solution to which uses ExitHandler. I post my code below, hope it can help someone else.


def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here

like image 92
Wenmin Wu Avatar answered Nov 25 '22 06:11

Wenmin Wu