kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.6k stars 1.62k forks source link

[bug] ParallelFor overwrite output artifacts at each iteration #6718

Closed pretidav closed 3 months ago

pretidav commented 3 years ago

What steps did you take

from kfp.v2 import dsl from kfp import dsl as v1dsl

Using v2 compatible pipeline, using a combination of
with v1dsl.SubGraph(parallelism=1): with dsl.ParallelFor(generate_task.output) as item:

where generate_task is a component with a dictionary as output (whose length is decided dynamically)

What happened:

Output artifact produced inside the loop are written using the same path generated at compilation time.

What did you expect to happen:

Artifacts from a given iteration should be saved and not overwritten by the subsequent iteration.

Environment:

Kubeflow Pipeline 1.6

Labels

/area frontend /area backend /area sdk /area components

Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

zijianjoy commented 3 years ago

cc @chensun

ozora-ogino commented 2 years ago

@pretidav Could you share the code to reproduce?

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

lightning-like commented 2 years ago

We have the same problem.

Do you have any ides how to fix it?

Could you share the code to reproduce?

I think we can easy to reproduce it

@component()
def create_list() -> list:
    return list(range(10))

@component()
def to_df(
    input_number: int,
    output_csv: Output[Dataset],
):
    import pandas as pd

    pd.DataFrame([input_number]).to_csv(output_csv.path)

@component()
def from_df(
    input_number: Input[Dataset],
) -> int:
    import pandas as pd

    return int(pd.read_csv(input_number.path).iloc[0, 0])

@pipeline(
    name="test_for",
    description=" test_for",
)
def parallel_for_test():
    list_step = create_list()

    with dsl.ParallelFor(list_step.output) as number:
        df_step = to_df(number)
        from_df_step = from_df(df_step.outputs["output_csv"])

here we have 10 times df_step and each time there are different inputs but output.path for all steps is the same [minio://mlpipeline/v2/artifacts/pipeline/test_for/10db378a-3320-4631-86ca-0cbe798e9cbe/to-df/output_csv](***/pipeline/artifacts/minio/mlpipeline/v2/artifacts/pipeline/test_for/10db378a-3320-4631-86ca-0cbe798e9cbe/to-df/output_csv?namespace=***)

GfxKai commented 1 year ago

Same issue here, we can't generate any artifacts within a ParallelFor loop because Kubeflow writes to the same URI for every iteration 🤦‍♂️

It also seems that if you set the name parameter on ParallelFor, the loop items (along with their progress, logs, artifacts and visualisations) vanish from the UI completely.

It's incredibly frustrating that such an important component is completely broken and unusable, with no warnings or documentation other than:

Setting parallelism is not yet supported by the KFP orchestration backend

Suggestion: from my initial digging, seems like some logic needs to be added to provisionOutputs in the backend driver to handle iterator context, similarly to how this case is handled in resolveInputs.

@chensun @connor-mccarthy any thoughts / ideas / updates? Happy to help if possible, this is a pretty significant blocker on our team 🥲

github-actions[bot] commented 4 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

alan-salimov-vai commented 2 months ago

/reopen

google-oss-prow[bot] commented 2 months ago

@alan-salimov-vai: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to [this](https://github.com/kubeflow/pipelines/issues/6718#issuecomment-2292188140): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.