kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.61k stars 1.63k forks source link

The dsl.ParallelFor loop variable can only be used in command-line #4145

Closed theone4ever closed 4 years ago

theone4ever commented 4 years ago

What steps did you take:

Trying to use loop argument when instantiate dsl.ContainerOp, like name of ContainerOp, file_outputs, etc, none of them works except arguments.

import kfp
from kfp import dsl

@dsl.pipeline(name='dynamic-dag-pipeline')
def dynamic_dag_output_pipeline(loopidy_doop:list=[1,2,3,4,5]):
    ops = {}
    ops['op0'] = dsl.ContainerOp(
        name="step-1",
        image="library/bash:4.4.23",
        command=["sh", "-c"],
        arguments=[f"echo 'First step'"]
    )

    with dsl.ParallelFor(loopidy_doop) as item:
        ops['op1'] = dsl.ContainerOp(
            name=f"step-1-{item}",                    # It works if I remove {item}
            image="library/bash:4.4.23",
            command=["sh", "-c"],
            arguments=[f"echo input value item.b: {item} > /tmp/output.txt"],
            file_outputs={'output': f'/tmp/output-{item}.txt'}          # It works if I remove {item}
        ).after(ops['op0'])

        ops['op1'].set_display_name(f'step-1-{item}')  # It works if I remove {item} too

if __name__ == '__main__':
    pipeline_name = dynamic_dag_output_pipeline.__name__
    pipeline_path = __file__ + '.tar.gz'
    kfp.compiler.Compiler().compile(dynamic_dag_output_pipeline, pipeline_path)

What happened:

The KFP compilation with errors "RuntimeError: Internal compiler error: Found unresolved PipelineParam."

What did you expect to happen:

I should be able to use loop argument when define dal.ContainerOp inside dsl.ParallelFor

Environment:

kfp 0.5.1

How did you deploy Kubeflow Pipelines (KFP)? Standard deployment

KFP version:0.5.1

KFP SDK version: 0.5.1

Anything else you would like to add:

/kind bug

Bobgy commented 4 years ago

/assign @Ark-kun

Ark-kun commented 4 years ago

Hello @theone4ever Thank you for reaching to us and reporting this issue. The behavior is mostly by design.

.after(ops['op0'])

You do not need to use .after if you have proper data dependencies (and you should have them with good components).

op = kfp.components.load_component_from_text('''
name: Process item
inputs:
- name: item
outputs:
- name: log
implementation:
  container:
    image: library/bash:4.4.23
    command: ['sh", '-c', 'mkdir -p "$(dirname "$1")"; echo "input value item.b: $0" > "$1"', {inputValue: item}, {outputPath: log}],
''')
theone4ever commented 4 years ago

Hi @Ark-kun thank you for explanation, but there are still few things I would like to have better understanding.

Everything works pretty well with ContainerOp alone, but when we introduce dsl.ParallelFor, we feel sth is missing. Start with output:

Let me know if makes sense to you?

Ark-kun commented 4 years ago
  • We would like to use either file_outputs (for small size of output) or output_artifact_paths (for large size output) to declare the output of the parallelized task and also make the output visible in Kubeflow Dashboard.

For quite a long time file_outputs has had the same big data passing capabilities as output_artifact_paths. The maps are currently merged. We had a PR to deprecate output_artifact_paths since it's not needed anymore.

we want to pass these output to next task which is outside of dsl.ParallelFor directive.

While it's easy to pass task outputs inside the loop, aggregating outputs of dsl.ParallelFor tasks is not currently supported by the system. All inputs (especially artifact inputs) must be formally and statically defined, but ParallelFor is dynamic. (BTW, did you know that you can use static loops? Just use python's for ... in ...: loop. Aggregating small outputs of static loops is possible.)

But to make it available in dashboard, especially for large output with output_artifact_paths, it is hard to implement without using loop argument, item variable in this case

Dashboard won't know about your persistent volume. The volume won't be mounted to the UX pod (and it's not even possible in many cases).

But to make it available in dashboard

The local paths of output files do not matter. The files are grabbed and put into storage. So you do not need f'/tmp/output-{item}.txt' - any name will work. But it's better to use a proper component and us generated names.

Let's step back and think about what you want to achieve.

  1. You want the component code to know where to write the data (e.g. directory in volume or GCS directory URI). Solution: Just formally pass that information in. Have an input called output_dir or output_dir_uri.

  2. You want the component code to know the item it needs to process. Solution: Have an input for the item. You already pass the item in your example

  3. You want the file name in the volume or backet to depend on the {item}. Solution: Just do that! Your component already has {item}. You can use it for anything. For example you can use it to construct the bucker URI or file name in volume.

  4. You want the output data to be visible in the dashboard. Are you sure you want to see the actual data and not for example the URI/path to the data? If you want to see the data, then your component should have an output and you should write the data to the file path given to you for this output. If you want to see the UR in the dashboard then you need an output for the URI and you need to write the URI to the provided file path.

Regarding set_display_name, it will be more readable in dashboard.

Yes, I understand. This is a good idea.

Ark-kun commented 4 years ago

Check the component that copies the result data to a GCS bucket with URI based on the loop item. The result is output from the component and visible in the dashboard. The result URI is output from the component and visible in the dashboard.

name: Process item
inputs:
- name: item
- name: output_uri_prefix
outputs:
- name: result
- name: result_uri
implementation:
  container:
    image: google/cloud-sdk
    command:
    - sh
    - -ex
    - -c
    - |
      item=$0
      output_uri_prefix=$1
      result_path=$2
      result_uri_path=$3

      mkdir -p "$(dirname "$result_path")"
      mkdir -p "$(dirname "$result_uri_path")"

      echo "input value item.b: $item" > "$result_path"

      result_uri="${output_uri_prefix}/${item}"
      gsutil cp "$result_path" "$result_uri"

    - {inputValue: item}
    - {inputValue: output_uri_prefix}
    - {outputPath: result}
    - {outputPath: result_uri}

With a volume you can just replace gsutil cp with cp.

Does this suit your needs?

P.S. There was previously a syntax error in the component. Some may still remain - I have not tested it.

theone4ever commented 4 years ago

@Ark-kun thank you for detail answer, I will investigate your suggestions tomorrow. A quick feedback although, there is still data size limitation when using file_outputs, few days ago I tried a parquet file and it triggers the error due to size limitation, so yes I would prefer to save it in an external storage and just provide a link in dashboard.

Ark-kun commented 4 years ago

A quick feedback although, there is still data size limitation when using file_outputs, few days ago I tried a parquet file and it triggers the error due to size limitation, so yes I would prefer to save it in an external storage

This should not be happening. Let's debug this.

BTW, the size error is usually caused by the downstream component, not the component with output.

All outputs are now produced as artifacts by default and support large data sizes. So, you should never see any size-related errors when you run a pipeline with a single component.

But it's still possible by downstream to try to consume that output as value (which is very easy mistake to do when you use ContainerOp instead of components) and that causes the output to also become a "parameter output" and they have size limitations. Even if they did not, the size of the command-line is limited, so you cannot really put any big data there. The command-line also cannot support binary data. You cannot inject a parquet file in the command-line. I think this is what was happening to you.

When using components it's harder to make this mistake as you always specify how you want to consume the data - as value (inputValue) or as file (inputPath), so you make an explicit decision.

theone4ever commented 4 years ago

@Ark-kun thank you for detail explanation. I am happy with your suggestion, idea and in fact the ContainerOp->component recommendation is a really good and important one. Very appreciated! Thus I close this issue

Ark-kun commented 4 years ago

Thank you.

BTW, I've created a fix for the task_display_name issue. It will be merged and released soon. https://github.com/kubeflow/pipelines/issues/4163