kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.53k stars 1.59k forks source link

[sdk] Can't use pipeline's input object in dsl.Condition #8203

Closed ittus closed 3 months ago

ittus commented 1 year ago

Environment

Steps to reproduce

@dsl.pipeline(name="Test pipeline", description=__doc__)
def test_pipeline(
    output_config: Optional[dict] = {
        "s3_bucket": "my-s3-bucket",
        "s3_prefix": "my-s3-prefix"
    }
) -> None:
    with dsl.Condition(output_config is not None):
    (
        # run a container op
    )

Then there is an error when create the pipeline

    if isinstance(group.condition.operand1, dsl.PipelineParam):
AttributeError: 'bool' object has no attribute 'operand1'

If I change the condition to

 with dsl.Condition(output_config != None)

then when running, there is another error

This step is in Error state with this message: Invalid 'when' expression '"{"s3_bucket": "my-s3-bucket", "s3_prefix": "my-s3-prefix}" != "None"': Cannot transition token types from STRING [{] to VARIABLE [s3_bucket]

Expected result

Materials and reference

Labels

/area sdk


Impacted by this bug? Give it a 👍.

connor-mccarthy commented 1 year ago

Thanks for reporting this @ittus. This is a known bug.

sumanassarmathg commented 1 year ago

Any update on this? We'd like to use the dsl.Condition to trigger some alert ops and can't whilst this is open.

connor-mccarthy commented 1 year ago

This has not yet been fixed in KFP v1.

This was fixed at least as early as kfp==2.0.0b1. The following runs without issue:

import kfp

assert kfp.__version__ == '2.0.0b1'

from kfp import compiler
from kfp import dsl

@dsl.component
def identity(string: str) -> str:
    return string

@dsl.pipeline()
def my_pipeline(string: str = 'string'):
    with dsl.Condition(string == 'string'):
        op1 = identity(string=string)

ir_file = __file__.replace('.py', '.yaml')
compiler.Compiler().compile(pipeline_func=my_pipeline, package_path=ir_file)
MatthewRalston commented 1 year ago

That may run, however anything beyond a trivial equality (like checking if a file needs to be decompressed by checking a file extension) fails due to PipelineParameterChannel attribute issues. This seems related to #8626 and essentially prevents me from using kubeflow pipelines in a production setting.

connor-mccarthy commented 1 year ago

@MatthewRalston, thanks for your response. If I understand what you're describing correctly, that's independent from the use of pipeline input in dsl.Condition.

But yes, only a limited number of operators are permitted in dsl.Condition. Arbitrary user code is not permitted in dsl.Condition (or the pipeline body more generally) since the pipeline body defines the orchestration and will not actually be run at pipeline runtime.

xsqian commented 1 year ago

I am also getting the same error message as long as the condition is not based on the PipelineParam, e.g.

a_local_variable=5
with dsl.Condition(a_local_variable == 5):
    pass

AttributeError: 'bool' object has no attribute 'operand1'

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 months ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.