kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.5k stars 1.57k forks source link

[feature] Support custom serialization of objects for Python components #6304

Open alanhdu opened 2 years ago

alanhdu commented 2 years ago

Feature Area

</area sdk> </area components>

What feature would you like to see?

Right now, the Python components can only input + output a fairly limited set of types (essentially JSON, with some special case support for "file-like" objects / paths). It'd be great if we could plug-in support for a richer set of types where the serialization/deserialization happens outside of the Python component.

What is the use case or pain point?

We are currently trying to use Kubeflow pipelines as part of our research workflow. As part of this, we have scientists (who are not well-versed in Argo or Kubernetes) writing Python function-based components. A lot of the research code, however, requires "complicated objects" -- whether those are NumPy arrays, Pandas Dataframes, or even just sophisticated dataclasses for "structured" configuration. These are not natively supported by Kubeflow pipelines, but all of them have straightfoward serialization/deserialization methods we can provide to and from strings (e.g. pandas dataframes can be represented by an S3 URI pointing to CSV that we serialize/deserialize from). Right now, scientists have to provide that boilerplate themselves, which takes a lot of time.

Concrete, it'd be nice if turn something like:

# Old way
import cattr
def run(config: dict):
     config = cattr.structure(config, Config)

def aggregate(metrics_path: comp.InputPath(str), output_path: comp.OutputPath(str)):
     df = pd.read_csv(metrics_path)
     output = process(df)
     df.to_csv(output_path)

## New Way
def run(config: Config):
    ...  # automatically serislaized/deserialized from JSON

def aggregate(metrics: pd.DataFrame) -> pd.DataFrame:
    ... # Path munging happens automatically

Is there a workaround currently?

I haven't implemented these yet, but I could see a couple of implementation options:

  1. Hook into the converters at https://github.com/kubeflow/pipelines/blob/a6ab4e4411dcb700e751c458f47d69857a65ee7a/sdk/python/kfp/components/_data_passing.py#L109-L125 to add special serialization + deserialization hooks for the relevant classes.

  2. Implement a special Json generic type and a @to_json decorator that basically does:

@to_json
def run(config: Json[Config]):
     pass

Json[T] would implement the to_dict function to register itself as just JSON from kubeflow's POV. to_json would handle serializing / deserializing the Json[T] to and from T based on the type annotation.

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

geier commented 1 year ago

Code seems to have moved to https://github.com/kubeflow/pipelines/blob/f167e15c3ca9b2120f8bb2283b0f3974439fef2f/sdk/python/kfp/components/types/type_utils.py

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.