kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.51k stars 1.58k forks source link

[backend] ParallelFor does not work on beta.0 - multiple issues #8990

Closed b4sus closed 3 months ago

b4sus commented 1 year ago

Environment

Steps to reproduce

Simple pipeline defined like this:

@dsl.component
def addition_component(num1: int, num2: int) -> int:
    print('hello from component')
    return num1 + num2

@dsl.pipeline(name='addition-pipeline')
def my_pipeline(a: int, b: int):
    add_task_1 = addition_component(num1=a, num2=b)

    with dsl.ParallelFor(items=[1, 2], parallelism=2) as item:
        add_task_2 = addition_component(num1=add_task_1.output, num2=item)

When running the pipeline, getting the following error:

Failed to create a new v1beta1 run: InternalServerError: Failed to validate workflow for (): spec.templates[5].name: 'comp-tasksgrouptype-for-loop-2-TasksGroupType.FOR-LOOP-2' is invalid: name must consist of alpha-numeric characters or '-', and must start with an alpha-numeric character (e.g. My-name1-2, 123-NAME)

As a workaround, I can only compile the pipeline, change the TasksGroupType.FOR-LOOP-2 to TasksGroupType-FOR-LOOP-2 in yaml and upload the pipeline. However when I run it, I get other error from kfp-driver:

F0315 08:09:30.260370 19 main.go:74] KFP driver: failed to unmarshal component spec, error: unknown field "iteratorPolicy" in ml_pipelines.PipelineTaskSpec

As a workaround, I can remove the parallelism - respectively the iteratorPolicy from compiled pipeline yaml (meaning all will run at once), upload and run again, the pipeline run successfully.

However, my other pipeline, where the tasks in parallel context are getting the input not as parameters, but as an artifact, like this:

@dsl.pipeline(name='train-pipeline')
def train_pipeline(filename: str):
    data_reader_task = read_data(filename=filename)
    with dsl.ParallelFor(
        items=list(range(8, 18)),
        parallelism=2
    ) as bucket:
        task = train_bucket(data=data_reader_task.outputs['output'], bucket=bucket)

there is another error in kfp-driver:

F0315 12:02:29.585512 18 main.go:74] KFP driver: driver.Container(pipelineName=pipeline/train-pipeline, runID=39e035a8-baac-4a4c-a5cf-65360d66665f, task="train_bucket", component="comp-train-bucket", dagExecutionID=115, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact data with spec component_input_artifact:"pipelinechannel--read-data-output": component input artifact not implemented yet 

Does that mean that for now tasks in parallel context cannot get artifacts from other tasks, only parameters? Is there a plan to fix this?

Expected result

Pipeline with ParallelFor should run out of the box, respecting parallelism parameter and tasks in parallel context should be able to accept artifacts.


Impacted by this bug? Give it a 👍.

chensun commented 1 year ago

Hi @b4sus, which Python version are you using? I think this might be a known incompatibility issue with Python 3.11, and we have a fix that should be available in the next SDK release.

b4sus commented 1 year ago

Hey @chensun , I am indeed using 3.11. I've just tried 3.10 as base image, the behavior was the same - all errors. I don't see how this could relate with version - first 2 problems are yaml related:

  1. argo validation failing for name - coming from here
  2. failes to unmarshall yaml spec
  3. failes to resolve artifact

Points 2. and 3. are failing in kfp-driver image, nothing to do with python imo.

Why do you think it is python version related?

b4sus commented 1 year ago

Tested with beta.1, first 2 problems are gone. Remaining issue is the component input artifact not implemented yet - so still unusable.

github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

hoangtan96dl commented 9 months ago

Environment

I'm having the same problem component input artifact not implemented yet with ParalellFor while trying the advanced ML pipeline example from https://www.kubeflow.org/docs/components/pipelines/v2/installation/quickstart/

comp-for-loop-1:
    dag:
      tasks:
        train-model:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-train-model
          inputs:
            artifacts:
              normalized_iris_dataset:
                componentInputArtifact: pipelinechannel--normalize-dataset-normalized_iris_dataset
            parameters:
              n_neighbors:
                componentInputParameter: pipelinechannel--neighbors-loop-item
          taskInfo:
            name: train-model
    inputDefinitions:
      artifacts:
        pipelinechannel--normalize-dataset-normalized_iris_dataset:
          artifactType:
            schemaTitle: system.Dataset
            schemaVersion: 0.0.1
      parameters:
        pipelinechannel--neighbors:
          parameterType: LIST
        pipelinechannel--neighbors-loop-item:
          parameterType: NUMBER_INTEGER

I think the problem is the compiler creates a seperate pipline for a ParallelFor loop and I have not seen any pipeline examples that receive Artifact as Input and pass it to component like these lines

          inputs:
            artifacts:
              normalized_iris_dataset:
                componentInputArtifact: pipelinechannel--normalize-dataset-normalized_iris_dataset

So I think if there are any working examples follow this pattern then we can temporarily fix by manually editing the pipeline YAML file.

github-actions[bot] commented 6 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.