Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass data or files between Kubeflow containerized components in python

I'm exploring Kubeflow as an option to deploy and connect various components of a typical ML pipeline. I'm using docker containers as Kubeflow components and so far I've been unable to successfully use ContainerOp.file_outputs object to pass results between components.

Based on my understanding of the feature, creating and saving to a file that's declared as one of the file_outputs of a component should cause it to persist and be accessible for reading by the following component.

This is how I attempted to declare this in my pipeline python code:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

In the python code for the data-collector.py component I fetch the dataset then write it to output.txt. I'm able to read from the file within the same component but not inside data-preprocessor.py where I get a FileNotFoundError.

Is the use of file_outputs invalid for container-based Kubeflow components or am I incorrectly using it in my code? If it's not an option in my case, is it possible to programmatically create Kubernetes volumes inside the pipeline declaration python code and use them instead of file_outputs?

like image 266
Ash Avatar asked Mar 03 '23 09:03

Ash


2 Answers

Files created in one Kubeflow pipeline component are local to the container. To reference it in the subsequent steps, you would need to pass it as:

data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=["--fetched_dataset", data_collector.outputs['output'],
                   "--project_id", project_id,
                  ]

Note: data_collector.outputs['output'] will contain the actual string contents of the file /output.txt (not a path to the file). If you want for it to contain the path of the file, you'll need to write the dataset to shared storage (like s3, or a mounted PVC volume) and write the path/link to the shared storage to /output.txt. data_preprocessor can then read the dataset based on the path.

like image 71
santiago92 Avatar answered May 16 '23 06:05

santiago92


There are three main steps:

  1. save a outputs.txt file which will include data/parameter/anything that you want to pass to next component. Note: it should be at the root level i.e /output.txt

  2. pass file_outputs={'output': '/output.txt'} as arguments as shown is example.

  3. inside a container_op which you will write inside dsl.pipeline pass argument (to respective argument of commponent which needs output from earlier component) as comp1.output (here comp1 is 1st component which produces output & stores it in /output.txt)

import kfp
from kfp import dsl

def SendMsg(
    send_msg: str = 'akash'
):
    return dsl.ContainerOp(
        name = 'Print msg', 
        image = 'docker.io/akashdesarda/comp1:latest', 
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', send_msg
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )

def GetMsg(
    get_msg: str
):
    return dsl.ContainerOp(
        name = 'Read msg from 1st component',
        image = 'docker.io/akashdesarda/comp2:latest',
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', get_msg
        ]
    )

@dsl.pipeline(
    name = 'Pass parameter',
    description = 'Passing para')
def  passing_parameter(send_msg):
    comp1 = SendMsg(send_msg)
    comp2 = GetMsg(comp1.output)


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')
like image 34
Akash Desarda Avatar answered May 16 '23 08:05

Akash Desarda