kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.6k stars 1.62k forks source link

When using NamedTuple as a return type for a pipeline, which is created inside another pipeline (pipeline inside another pipeline case), how to access the outputs of that inner pipeline? #9653

Closed Princy-Malhotra closed 9 months ago

Princy-Malhotra commented 1 year ago

Environment

Steps to reproduce

Given below is the code that I am trying to execute:

from kfp import dsl
from typing import NamedTuple
from kfp import compiler
from kfp.dsl import component, Input, Output, Dataset

@component(
        base_image='python:3.11',
        packages_to_install=['pandas','numpy','scikit-learn']
)
def fetch_process_data(processed_X:Output[Dataset], processed_y:Output[Dataset]):
    from sklearn.datasets import fetch_openml
    import pandas as pd

    df,y= fetch_openml("titanic", version=1, as_frame=True, return_X_y=True)
    # Preprocess the data

    df.to_csv(processed_X.path, index=False)
    y.to_csv(processed_y.path, index=False)

@component(
        base_image='python:3.11',
        packages_to_install=['pandas','scikit-learn']
)
def test_train_split(fetched_X:Input[Dataset],fetched_Y:Input[Dataset],X_train:Output[Dataset], X_test:Output[Dataset], y_train:Output[Dataset], y_test:Output[Dataset]):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.DataFrame(pd.read_csv(fetched_X.path))
    y = pd.DataFrame(pd.read_csv(fetched_Y.path))

    X_train1, X_test1, y_train1, y_test1 = train_test_split(df, y, train_size=0.8)

    X_train1.to_csv(X_train.path, index=False)
    X_test1.to_csv(X_test.path, index=False)
    y_train1.to_csv(y_train.path, index=False)
    y_test1.to_csv(y_test.path, index=False)

@component(
        base_image='python:3.11',
        packages_to_install=['pandas','scikit-learn']
)
def dec_tree(X_train: Input[Dataset], y_train: Input[Dataset])->float:
    import pandas as pd

    X_train = pd.read_csv(X_train.path)
    y_train = pd.read_csv(y_train.path)

    # Fit a decision tree model and return it's accuracy

@dsl.pipeline(
        name='process_data'
)
def process_data()->NamedTuple('Outputs',[('pro_X', Dataset), ('pro_y', Dataset)]):
    from collections import namedtuple

    ans1 = fetch_process_data()
    process_out = namedtuple('Outputs', ['pro_X', 'pro_y'])
    return process_out(ans1.outputs['processed_X'], ans1.outputs['processed_y'])

@dsl.pipeline(
        name='pipe_main',
        description='An example pipeline',
        )
def pipe_main():
    ans = process_data()
    split_data = test_train_split(fetched_X=ans.outputs['pro_X'], fetched_Y=ans.outputs['pro_y'])
    accu = dec_tree(X_train=split_data.outputs['X_train'], y_train=split_data.outputs['y_train'])

if __name__=='__main__':
    print('entry')
    compiler.Compiler().compile(pipeline_func=pipe_main,package_path='p3y.yaml')

I then uploaded the yaml file on the localhost and created a run.

Expected result

The run should have completed successfully. But, the run is failing in the middle at test-train-split component. I think the error is somewhere where we are passing the results of the process_data() pipeline to the test-train-split component. I am not able to find where the error is. A screenshot of the run is attached below:

image

The Sub DAG for process data pipeline is: image

The logs do not show any error, only warnings.

Materials and reference

I referred to the official documentation for Kubeflow v2. Here's the link : https://www.kubeflow.org/docs/components/pipelines/v2/

Labels

/area sdk


Impacted by this bug? Give it a 👍.

chensun commented 1 year ago

Hi @Princy-Malhotra , if you click the test-train-split component, do you see any log?

Princy-Malhotra commented 1 year ago

Hi @Princy-Malhotra , if you click the test-train-split component, do you see any log?

No @chensun , there are no logs for that component. Logs are empty. Here are some screenshots attached: image Screenshot from 2023-06-25 05-30-31

Princy-Malhotra commented 1 year ago

Hi @chensun , did you find a solution for this? (Asking since it's a bit important)

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 9 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

vamsi-01 commented 4 months ago

/reopen

I defined a NamedTuple (say DataOutput) and using it as the returnType for the component's callable function but when the task is getting executed, it is failing with the "NameError: name 'DataOutput' is not defined". The same is working with the KFP V1 but failing in V2.

any help here?

google-oss-prow[bot] commented 4 months ago

@vamsi-01: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to [this](https://github.com/kubeflow/pipelines/issues/9653#issuecomment-2134541010): >/reopen > >I defined a NamedTuple (say DataOutput) and using it as the returnType for the component's callable function but when the task is getting executed, it is failing with the "NameError: name 'DataOutput' is not defined". The same is working with the KFP V1 but failing in V2. > >any help here? > Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes-sigs/prow](https://github.com/kubernetes-sigs/prow/issues/new?title=Prow%20issue:) repository.