kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.54k stars 1.6k forks source link

[sdk] Error compiling with ParallelFor and PipelineParam #10592

Open jsilva opened 5 months ago

jsilva commented 5 months ago

Environment

kfp 2.7.0 kfp-pipeline-spec 0.3.0 kfp-server-api 2.0.5


### Steps to reproduce
Using the following script IO get an unexpected error

```python
from kfp.v2 import dsl, compiler

@dsl.component()
def print_op(message: str, shard: int) -> None:
    print(f"Shard: {shard}")
    print(f"Message: {message}")

@dsl.pipeline()
def my_pipeline(
        processing_date: str = "2021-01-01",
):
    shards = [1, 2, 3]
    with dsl.ParallelFor(shards, parallelism=len(shards)) as shard:
        one = print_op(message=processing_date, shard=shard)

    two = print_op(message=processing_date, shard=1).after(one)

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline.yaml"
)

or adding the following test

    def test_valid_parallelfor_5(self):

        @dsl.pipeline
        def my_pipeline(test_arg: str = 'foo'):
            with dsl.ParallelFor([1, 2, 3]):
                one = print_and_return(text=test_arg)

            # refers to all instances of one
            two = print_and_return(text=test_arg).after(one)

        self.assertTrue(my_pipeline.pipeline_spec)

to the python sdk compiler_test.py will do the trick.

With the above I get the following error:


/Users/xxxxxxxx/work/mlops/wanna/wanna-customers/wanna-kraken/quick_kfp.py:1: DeprecationWarning: The module `kfp.v2` is deprecated and will be removed in a futureversion. Please import directly from the `kfp` namespace, instead of `kfp.v2`.

  from kfp.v2 import dsl, compiler
/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/component_decorator.py:119: FutureWarning: Python 3.7 has reached end-of-life. The default base_image used by the @dsl.component decorator will switch from 'python:3.7' to 'python:3.8' on April 23, 2024. To ensure your existing components work with versions of the KFP SDK released after that date, you should provide an explicit base_image argument and ensure your component works as intended on Python 3.8.
  return component_factory.create_component_from_func(
Traceback (most recent call last):
  File "/Users/xxxxxxxx/work/mlops/wanna/wanna-customers/wanna-kraken/quick_kfp.py", line 10, in <module>
    def my_pipeline(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
    return component_factory.create_graph_component_from_func(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
    return graph_component.GraphComponent(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/graph_component.py", line 68, in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1909, in create_pipeline_spec
    dependencies = compiler_utils.get_dependencies(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/compiler_utils.py", line 762, in get_dependencies
    upstream_tasks_that_downstream_consumers_from = [
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/compiler_utils.py", line 763, in <listcomp>
    channel.task.name for channel in task._channel_inputs

AttributeError: 'NoneType' object has no attribute 'name'

Expected result

  1. pipeline.yaml available on disk ready for JobSubmit

Materials and Reference

This issue is related to changes in https://github.com/kubeflow/pipelines/pull/10257 Doing a check if channel.task in https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/compiler/compiler_utils.py#L763 will make the above test pass.


Impacted by this bug? Give it a 👍.

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

HumairAK commented 3 months ago

this looks like it's still relevant

jsilva commented 2 months ago

This bug is still present in SDK 2.2.0.

agkphysics commented 1 month ago

Since presumably only tasks are needed to be checked here, pipeline parameters can probably be ignored: https://github.com/kubeflow/pipelines/blob/9cb5913d4757132e2c6e5f1c5fa2cdb7ad541f97/sdk/python/kfp/compiler/compiler_utils.py#L762-L765

Like so:

upstream_tasks_that_downstream_consumers_from = [
    channel.task.name for channel in task._channel_inputs if channel.task is not None
]