kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.55k stars 1.6k forks source link

[feature] Directly transfer dataset from GCP bucket to minio #10535

Closed amarion35 closed 5 months ago

amarion35 commented 6 months ago

Feature Area

/area components

What feature would you like to see?

Maybe this feature already exists but I couldn't find it.

The first component of our pipelines is often a component that just downloads a dataset (folders and files) from a GCP bucket into a dsl.Dataset artifact.

In our cases the component looks like this:

@dsl.component(base_image="python:3.11", packages_to_install=["google-cloud-storage"])
def read_dataset_op(dataset_uri: str, dataset: dsl.Output[dsl.Dataset]) -> None:
    """Read a dataset."""
    import os
    from pathlib import Path
    import logging
    from concurrent.futures import ThreadPoolExecutor
    import google.cloud.storage
    import google.cloud.storage.blob

    logger = logging.getLogger(__name__)
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    logger.info("Reading dataset from %s", dataset_uri)

    # Download the all the files in the dataset
    client = google.cloud.storage.Client()
    bucket_name, blob_name = dataset_uri.replace("gs://", "").split("/", 1)
    logger.info("bucket_name: %s, blob_name: %s", bucket_name, blob_name)
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=blob_name)
    os.makedirs(dataset.path, exist_ok=True)

    def download_blob(blob: google.cloud.storage.blob.Blob, output_path: Path) -> None:
        try:
            blob_name: str = blob.name
            filename = blob_name.split("/")[-1]
            logger.info("Downloading %s to %s", blob.name, output_path / filename)
            blob.download_to_filename(filename=str(output_path / filename))
            logger.info("Downloaded %s", filename)
        except Exception as e:
            logger.exception(e)
            raise e

    def download_blobs(
        blobs: Iterator[google.cloud.storage.blob.Blob], output_path: Path
    ) -> None:
        with ThreadPoolExecutor() as executor:
            for blob in blobs:
                executor.submit(download_blob, blob, output_path)

    try:
        download_blobs(blobs, Path(dataset.path))
    except Exception as e:
        logger.exception(e)
        raise e

Once the component is completed, the artifact is uploaded to minio. So the dataset is transfered twice, once from GCP to the pod and once from the pod to minio. Why not just do it in a single transfer ?

What is the use case or pain point?

Doing this in a single transfer would make the caching of datasets faster. The functionality could be encapsulated into a specific artifact or into a kubeflow component, so we wouldn't have rewrite this component.

Is there a workaround currently?

Not using the Dataset artifact and directly download the dataset in the components that needs it. Only use the artifacts in the intermediates (preprocessing) datasets.


Love this idea? Give it a 👍.

rimolive commented 6 months ago

Looks like the same case as in this sample: https://github.com/kubeflow/pipelines/blob/master/samples/v2/pipeline_with_importer.py

amarion35 commented 6 months ago

According to this doc, importer is used to import existing artifacts. But it also says that it is able to use as an artifact an external file that was not generated by a pipeline. Does it means it can import any file or folder from a gcs bucket ?

https://www.kubeflow.org/docs/components/pipelines/v2/components/importer-component/

rimolive commented 6 months ago

I am right now testing this sample in a different environment, but I'd like to reinforce that part of the doc:

"If you wish to use an existing artifact that was not generated by a task in the current pipeline or wish to use as an artifact an external file that was not generated by a pipeline at all..."

So according to this, yes it is possible to inject external files to your pipeline as an artifact.

rimolive commented 6 months ago

Did you have the chance to look at it?

rimolive commented 5 months ago

Closing this issue. No activity.

/close

google-oss-prow[bot] commented 5 months ago

@rimolive: Closing this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/10535#issuecomment-2018188437): >Closing this issue. No activity. > >/close Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.