ml6team / fondant

Production-ready data processing made easy and shareable
https://fondant.ai/en/stable/
Apache License 2.0
341 stars 25 forks source link

Create Vertex runner #393

Closed RobbeSneyders closed 1 year ago

RobbeSneyders commented 1 year ago

Create a runner that allows users to run Fondant pipelines on Vertex AI pipelines.

### Tasks
- [ ] https://github.com/ml6team/fondant/issues/412
- [ ] https://github.com/ml6team/fondant/issues/413
- [ ] https://github.com/ml6team/fondant/issues/414
- [ ] https://github.com/ml6team/fondant/issues/423
- [ ] https://github.com/ml6team/fondant/issues/422
- [ ] https://github.com/ml6team/fondant/issues/420
- [ ] https://github.com/ml6team/fondant/issues/419
- [ ] https://github.com/ml6team/fondant/issues/417
- [ ] https://github.com/ml6team/fondant/issues/416
- [ ] https://github.com/ml6team/fondant/issues/434
- [ ] https://github.com/ml6team/fondant/issues/437
- [ ] https://github.com/ml6team/fondant/issues/494
- [ ] https://github.com/ml6team/fondant/issues/495
- [ ] https://github.com/ml6team/fondant/pull/533
GeorgesLorre commented 1 year ago

Vertex runs kubeflow pipelines in a more managed way since we already have a kubeflow runner (and compiler), vertex should be a logical next step.

For Vertex we should use the "new" kubeflow pipelines v2 which we currently do not support.

There are 3 options to make this work:

GeorgesLorre commented 1 year ago

Notes on generating kubeflow components specs.

In kfpv1 a component spec looks something like this:

name: Add
description: |
    Component to add two numbers
inputs:
- name: op-1
  type: Integer
- name: op2
  type: Integer
outputs:
- name: sum
  type: Integer
implementation:
  container:
    image: google/cloud-sdk:latest
    command:
    - sh
    - -c
    - |
      set -e -x
      echo "$(($0+$1))" | gsutil cp - "$2"
    - {inputValue: op-1}
    - {inputValue: op2}
    - {outputPath: sum}

it is not documented very well how to use v2 features in the old component spec format

In kfpv2 a component spec has been unified along with the pipeline spec into IR YAML This looks like this:

{
  "components": {
    "comp-fondant-component": {
      "executorLabel": "exec-fondant-component",
      "inputDefinitions": {
        "artifacts": {
          "input_manifest_path": {
            "artifactType": {
              "schemaTitle": "system.Artifact",
              "schemaVersion": "0.0.1"
            },
            "isOptional": true
          }
        },
        "parameters": {
          "component_spec": {
            "defaultValue": {},
            "isOptional": true,
            "parameterType": "STRUCT"
          },
          "input_partition_rows": {
            "isOptional": true,
            "parameterType": "STRING"
          },
          "metadata": {
            "parameterType": "STRING"
          }
        }
      },
      "outputDefinitions": {
        "artifacts": {
          "output_manifest_path": {
            "artifactType": {
              "schemaTitle": "system.Artifact",
              "schemaVersion": "0.0.1"
            }
          }
        }
      }
    }
  },
  "deploymentSpec": {
    "executors": {
      "exec-fondant-component": {
        "container": {
          "args": [
            "--input_manifest_path",
            "{{$.inputs.artifacts['input_manifest_path'].uri}}",
            "--metadata",
            "{{$.inputs.parameters['metadata']}}",
            "--component_spec",
            "{{$.inputs.parameters['component_spec']}}",
            "--input_partition_rows",
            "{{$.inputs.parameters['input_partition_rows']}}",
            "--output_manifest_path",
            "{{$.outputs.artifacts['output_manifest_path'].uri}}"
          ],
          "command": [
            "python3",
            "main.py"
          ],
          "image": "some_image"
        }
      }
    }
  },
  "pipelineInfo": {
    "name": "fondant-component"
  },
  "root": {
    "dag": {
      "outputs": {
        "artifacts": {
          "output_manifest_path": {
            "artifactSelectors": [
              {
                "outputArtifactKey": "output_manifest_path",
                "producerSubtask": "fondant-component"
              }
            ]
          }
        }
      },
      "tasks": {
        "fondant-component": {
          "cachingOptions": {
            "enableCache": true
          },
          "componentRef": {
            "name": "comp-fondant-component"
          },
          "inputs": {
            "artifacts": {
              "input_manifest_path": {
                "componentInputArtifact": "input_manifest_path"
              }
            },
            "parameters": {
              "component_spec": {
                "componentInputParameter": "component_spec"
              },
              "input_partition_rows": {
                "componentInputParameter": "input_partition_rows"
              },
              "metadata": {
                "componentInputParameter": "metadata"
              }
            }
          },
          "taskInfo": {
            "name": "fondant-component"
          }
        }
      }
    },
    "inputDefinitions": {
      "artifacts": {
        "input_manifest_path": {
          "artifactType": {
            "schemaTitle": "system.Artifact",
            "schemaVersion": "0.0.1"
          },
          "isOptional": true
        }
      },
      "parameters": {
        "component_spec": {
          "defaultValue": {},
          "isOptional": true,
          "parameterType": "STRUCT"
        },
        "input_partition_rows": {
          "isOptional": true,
          "parameterType": "STRING"
        },
        "metadata": {
          "parameterType": "STRING"
        }
      }
    },
    "outputDefinitions": {
      "artifacts": {
        "output_manifest_path": {
          "artifactType": {
            "schemaTitle": "system.Artifact",
            "schemaVersion": "0.0.1"
          }
        }
      }
    }
  },
  "schemaVersion": "2.1.0",
  "sdkVersion": "kfp-2.0.1"
}

There is no real difference between a spec describing a pipeline or a component (a component is just a one step pipeline) You can read tis spec from file or text and use it in another pipeline.

I have code to generate these new IR YAML's for fondant components

PhilippeMoussalli commented 1 year ago

Thanks for the extensive description @GeorgesLorre!

Solution 1: Indeed does not seems like the most optimal solution to have two versions. Regarding the runner, I think it's a given that we would have to have separate runner for both Vertex and KFP regardless of the version no?

Solution 2: importing from v2 was what we used to do before in Vertex at ML6 (now the new boilerplate is V2 and I haven't worked with it before). Although it's not well documented, we have the ability to use it properly based on the experience/boilerplate that we have. Downside is that we would then need to develop a different compiler for V2.

Solution 3: Seems to be the most optimal one indeed but It still feels like the full fledged v2 is still more integrated with Vertex rather than KFP on GKE (at least for the moment since the official release was not too long ago). There seems to be still some issues/features missing for us to select nodepools and GPU that are still to be integrated: https://github.com/kubeflow/pipelines/issues/9682

I would be more in favor of Solution 3 to avoid additional work, but we would need to make sure that it can offer all the core features that we need. I think in Vertex that's a given but would rather want to test it out on the standalone kfp deployment and check if we can select specific nodepools and work with GPUs. Otherwise it will break our current workflow.

Maybe we can setup a test cluster and deploy v2 there and do some tests?

GeorgesLorre commented 1 year ago

How to submit a kfp pipeline to vertex manually:

  1. Add this to your pipeline.py
    
    from fondant.compiler import VertexCompiler

compiler = VertexCompiler() compiler.compile(pipeline=pipeline, output_path="pipeline.json")



2. Invoke compilation: `python pipeline.py`

3. goto the [vertex ui](https://console.cloud.google.com/vertex-ai/pipelines/runs?project=soy-audio-379412)

4. Create new run, select the `pipeline.json` file

5. in de advanced options select the kfp service account 
RobbeSneyders commented 1 year ago

Released in 0.6.0.