kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.6k stars 1.62k forks source link

[sdk compiler] Compile is successed, but VertexPipelines validation is failed #5912

Closed TrsNium closed 7 months ago

TrsNium commented 3 years ago

Environment

Steps to reproduce

When I try to loop through the following python function to get the output, the validation of Vertex Pipelines fails.

import kfp.components as comp
from kfp.v2.dsl import component

@component
def print_op(message: str):
  """Prints a message."""
  print(message)

def get_parallel_offsets(
        rows_count: int,
        parallel_num: int,
) -> NamedTuple(
    "Offsets",
    [("offsets", "Offsets")],
):
    """
    Get the offset of each parallel (from the number of rows and the number of parallels in the table.)

    Parameters
    ----------
    rows_count: int
        number of bigquery table's rows
    parallel_num: int
        number of parallels
    """
    from collections import namedtuple
    import math
    import json

    if rows_count % parallel_num == 0:
        offset_step = limit = int(rows_count / parallel_num)
    else:
        offset_step = limit = math.ceil(rows_count / parallel_num)

    # NOTE: When using `json.dump`, if a number with a large number of digits is included, the number will be converted to Scientific Notation format, so convert it to a string type once.
    offsets = [
        {"index": str(index), "offset": str(offset), "upper_bounds": str(offset+limit)}
        for index, offset in enumerate(range(0, rows_count, offset_step))
    ]
    output = namedtuple("Offsets", ["offsets"])
    return output(json.dumps(offsets))

@dsl.pipeline(
    name="test",
)
def test(
    parallel_num: int=10,
) -> None:
    get_parallel_offsets_op = comp.create_component_from_func(
        func=get_parallel_offsets, base_image="python:alpine"
    )
    get_parallel_offsets_task = get_parallel_offsets_op(
        #get_count_rows_task.output, parallel_num
        3000, parallel_num
    )

    with dsl.ParallelFor(
        get_parallel_offsets_task.outputs["offsets"]
    ) as offset:
        print_op(offset.offset)
        print_op(offset.index)

error message

  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp-1.6.3-py3.7.egg/kfp/v2/google/client/client.py", line 344, in create_run_from_job_spec
    job_id=job_id,
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/kfp-1.6.3-py3.7.egg/kfp/v2/google/client/client.py", line 228, in _submit_job
    response = request.execute()
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/Users/takuya.hirata/.pyenv/versions/3.7.1rc1/lib/python3.7/site-packages/googleapiclient/http.py", line 915, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://asia-east1-aiplatform.googleapis.com/v1beta1/projects/zozo-mlops-dev/locations/asia-east1/pipelineJobs?pipelineJobId=test-20210624125423&alt=json returned "Iterator items parameter 'pipelineparam--get-parallel-offsets-offsets' cannot be found from the inputs of the task 'for-loop-1'.". Details: "Iterator items parameter 'pipelineparam--get-parallel-offsets-offsets' cannot be found from the inputs of the task 'for-loop-1'.">
compiled json ```json { "pipelineSpec": { "components": { "comp-for-loop-1": { "dag": { "tasks": { "print-op": { "componentRef": { "name": "comp-print-op" }, "inputs": { "parameters": { "message": { "componentInputParameter": "pipelineparam--get-parallel-offsets-offsets-loop-item", "parameterExpressionSelector": "parseJson(string_value)[\"upper_bounds\"]" } } }, "taskInfo": { "name": "print-op" } } } }, "inputDefinitions": { "artifacts": { "pipelineparam--get-parallel-offsets-offsets": { "artifactType": { "schemaTitle": "system.Artifact" } } }, "parameters": { "pipelineparam--get-parallel-offsets-offsets-loop-item": { "type": "STRING" } } } }, "comp-get-parallel-offsets": { "executorLabel": "exec-get-parallel-offsets", "inputDefinitions": { "parameters": { "parallel_num": { "type": "INT" }, "rows_count": { "type": "INT" } } }, "outputDefinitions": { "artifacts": { "offsets": { "artifactType": { "schemaTitle": "system.Artifact" } } } } }, "comp-print-op": { "executorLabel": "exec-print-op", "inputDefinitions": { "parameters": { "message": { "type": "STRING" } } } } }, "deploymentSpec": { "executors": { "exec-get-parallel-offsets": { "container": { "args": [ "--rows-count", "{{$.inputs.parameters['rows_count']}}", "--parallel-num", "{{$.inputs.parameters['parallel_num']}}", "----output-paths", "{{$.outputs.artifacts['offsets'].path}}" ], "command": [ "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", "def get_parallel_offsets(\n rows_count,\n parallel_num,\n):\n \"\"\"\n Get the offset of each parallel (from the number of rows and the number of parallels in the table.)\n\n Parameters\n ----------\n rows_count: int\n number of bigquery table's rows\n parallel_num: int\n number of parallels\n \"\"\"\n from collections import namedtuple\n import math\n import json\n\n if rows_count % parallel_num == 0:\n offset_step = limit = int(rows_count / parallel_num)\n else:\n offset_step = limit = math.ceil(rows_count / parallel_num)\n\n # NOTE: When using `json.dump`, if a number with a large number of digits is included, the number will be converted to Scientific Notation format, so convert it to a string type once.\n offsets = [\n {\"index\": str(index), \"offset\": str(offset), \"upper_bounds\": str(offset+limit)}\n for index, offset in enumerate(range(0, rows_count, offset_step))\n ]\n output = namedtuple(\"Offsets\", [\"offsets\"])\n return output(json.dumps(offsets))\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Get parallel offsets', description='Get the offset of each parallel (from the number of rows and the number of parallels in the table.)')\n_parser.add_argument(\"--rows-count\", dest=\"rows_count\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--parallel-num\", dest=\"parallel_num\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = get_parallel_offsets(**_parsed_args)\n\n_output_serializers = [\n str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n" ], "image": "python:alpine" } }, "exec-print-op": { "container": { "args": [ "--executor_input", "{{$}}", "--function_to_execute", "print_op", "--message-output-path", "{{$.inputs.parameters['message']}}" ], "command": [ "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", "\nimport json\nimport inspect\nfrom typing import *\n\n# Copyright 2021 The Kubeflow Authors\n#\n# Licensed under the Apache License, Version 2.0 (the \"License\");\n# you may not use this file except in compliance with the License.\n# You may obtain a copy of the License at\n#\n# http://www.apache.org/licenses/LICENSE-2.0\n#\n# Unless required by applicable law or agreed to in writing, software\n# distributed under the License is distributed on an \"AS IS\" BASIS,\n# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n# See the License for the specific language governing permissions and\n# limitations under the License.\n\"\"\"Classes for input/output types in KFP SDK.\n\nThese are only compatible with v2 Pipelines.\n\"\"\"\n\nimport os\nfrom typing import Dict, Generic, List, Optional, Type, TypeVar, Union\n\n\n_GCS_LOCAL_MOUNT_PREFIX = '/gcs/'\n_MINIO_LOCAL_MOUNT_PREFIX = '/minio/'\n_S3_LOCAL_MOUNT_PREFIX = '/s3/'\n\n\nclass Artifact(object):\n \"\"\"Generic Artifact class.\n\n This class is meant to represent the metadata around an input or output\n machine-learning Artifact. Artifacts have URIs, which can either be a location\n on disk (or Cloud storage) or some other resource identifier such as\n an API resource name.\n\n Artifacts carry a `metadata` field, which is a dictionary for storing\n metadata related to this artifact.\n \"\"\"\n TYPE_NAME = 'system.Artifact'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n \"\"\"Initializes the Artifact with the given name, URI and metadata.\"\"\"\n self.uri = uri or ''\n self.name = name or ''\n self.metadata = metadata or {}\n\n @property\n def path(self):\n return self._get_path()\n\n @path.setter\n def path(self, path):\n self._set_path(path)\n\n def _get_path(self) -> Optional[str]:\n if self.uri.startswith('gs://'):\n return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len('gs://'):]\n elif self.uri.startswith('minio://'):\n return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len('minio://'):]\n elif self.uri.startswith('s3://'):\n return _S3_LOCAL_MOUNT_PREFIX + self.uri[len('s3://'):]\n return None\n\n def _set_path(self, path):\n if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):\n path = 'gs://' + path[len(_GCS_LOCAL_MOUNT_PREFIX):]\n elif path.startswith(_MINIO_LOCAL_MOUNT_PREFIX):\n path = 'minio://' + path[len(_MINIO_LOCAL_MOUNT_PREFIX):]\n elif path.startswith(_S3_LOCAL_MOUNT_PREFIX):\n path = 's3://' + path[len(_S3_LOCAL_MOUNT_PREFIX):]\n self.uri = path\n\n\nclass Model(Artifact):\n \"\"\"An artifact representing an ML Model.\"\"\"\n TYPE_NAME = 'system.Model'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n super().__init__(uri=uri, name=name, metadata=metadata)\n\n @property\n def framework(self) -> str:\n return self._get_framework()\n\n def _get_framework(self) -> str:\n return self.metadata.get('framework', '')\n\n @framework.setter\n def framework(self, framework: str):\n self._set_framework(framework)\n\n def _set_framework(self, framework: str):\n self.metadata['framework'] = framework\n\n\nclass Dataset(Artifact):\n \"\"\"An artifact representing an ML Dataset.\"\"\"\n TYPE_NAME = 'system.Dataset'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n super().__init__(uri=uri, name=name, metadata=metadata)\n\n\nclass Metrics(Artifact):\n \"\"\"Represent a simple base Artifact type to store key-value scalar metrics.\"\"\"\n TYPE_NAME = 'system.Metrics'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n super().__init__(uri=uri, name=name, metadata=metadata)\n\n def log_metric(self, metric: str, value: float):\n \"\"\"Sets a custom scalar metric.\n\n Args:\n metric: Metric key\n value: Value of the metric.\n \"\"\"\n self.metadata[metric] = value\n\n\nclass ClassificationMetrics(Artifact):\n \"\"\"Represents Artifact class to store Classification Metrics.\"\"\"\n TYPE_NAME = 'system.ClassificationMetrics'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n super().__init__(uri=uri, name=name, metadata=metadata)\n\n def log_roc_data_point(self, fpr: float, tpr: float, threshold: float):\n \"\"\"Logs a single data point in the ROC Curve.\n\n Args:\n fpr: False positive rate value of the data point.\n tpr: True positive rate value of the data point.\n threshold: Threshold value for the data point.\n \"\"\"\n\n roc_reading = {\n 'confidenceThreshold': threshold,\n 'recall': tpr,\n 'falsePositiveRate': fpr\n }\n if 'confidenceMetrics' not in self.metadata.keys():\n self.metadata['confidenceMetrics'] = []\n\n self.metadata['confidenceMetrics'].append(roc_reading)\n\n def log_roc_curve(self, fpr: List[float], tpr: List[float],\n threshold: List[float]):\n \"\"\"Logs an ROC curve.\n\n The list length of fpr, tpr and threshold must be the same.\n\n Args:\n fpr: List of false positive rate values.\n tpr: List of true positive rate values.\n threshold: List of threshold values.\n \"\"\"\n if len(fpr) != len(tpr) or len(fpr) != len(threshold) or len(tpr) != len(\n threshold):\n raise ValueError('Length of fpr, tpr and threshold must be the same. '\n 'Got lengths {}, {} and {} respectively.'.format(\n len(fpr), len(tpr), len(threshold)))\n\n for i in range(len(fpr)):\n self.log_roc_data_point(fpr=fpr[i], tpr=tpr[i], threshold=threshold[i])\n\n def set_confusion_matrix_categories(self, categories: List[str]):\n \"\"\"Stores confusion matrix categories.\n\n Args:\n categories: List of strings specifying the categories.\n \"\"\"\n\n self._categories = []\n annotation_specs = []\n for category in categories:\n annotation_spec = {'displayName': category}\n self._categories.append(category)\n annotation_specs.append(annotation_spec)\n\n self._matrix = []\n for row in range(len(self._categories)):\n self._matrix.append({'row': [0] * len(self._categories)})\n\n self._confusion_matrix = {}\n self._confusion_matrix['annotationSpecs'] = annotation_specs\n self._confusion_matrix['rows'] = self._matrix\n self.metadata['confusionMatrix'] = self._confusion_matrix\n\n def log_confusion_matrix_row(self, row_category: str, row: List[float]):\n \"\"\"Logs a confusion matrix row.\n\n Args:\n row_category: Category to which the row belongs.\n row: List of integers specifying the values for the row.\n\n Raises:\n ValueError: If row_category is not in the list of categories\n set in set_categories call.\n \"\"\"\n if row_category not in self._categories:\n raise ValueError('Invalid category: {} passed. Expected one of: {}'.\\\n format(row_category, self._categories))\n\n if len(row) != len(self._categories):\n raise ValueError('Invalid row. Expected size: {} got: {}'.\\\n format(len(self._categories), len(row)))\n\n self._matrix[self._categories.index(row_category)] = {'row': row}\n self.metadata['confusionMatrix'] = self._confusion_matrix\n\n def log_confusion_matrix_cell(self, row_category: str, col_category: str,\n value: int):\n \"\"\"Logs a cell in the confusion matrix.\n\n Args:\n row_category: String representing the name of the row category.\n col_category: String representing the name of the column category.\n value: Int value of the cell.\n\n Raises:\n ValueError: If row_category or col_category is not in the list of\n categories set in set_categories.\n \"\"\"\n if row_category not in self._categories:\n raise ValueError('Invalid category: {} passed. Expected one of: {}'.\\\n format(row_category, self._categories))\n\n if col_category not in self._categories:\n raise ValueError('Invalid category: {} passed. Expected one of: {}'.\\\n format(row_category, self._categories))\n\n self._matrix[self._categories.index(row_category)]['row'][\n self._categories.index(col_category)] = value\n self.metadata['confusionMatrix'] = self._confusion_matrix\n\n def log_confusion_matrix(self, categories: List[str],\n matrix: List[List[int]]):\n \"\"\"Logs a confusion matrix.\n\n Args:\n categories: List of the category names.\n matrix: Complete confusion matrix.\n\n Raises:\n ValueError: Length of categories does not match number of rows or columns.\n \"\"\"\n self.set_confusion_matrix_categories(categories)\n\n if len(matrix) != len(categories):\n raise ValueError('Invalid matrix: {} passed for categories: {}'.\\\n format(matrix, categories))\n\n for index in range(len(categories)):\n if len(matrix[index]) != len(categories):\n raise ValueError('Invalid matrix: {} passed for categories: {}'.\\\n format(matrix, categories))\n\n self.log_confusion_matrix_row(categories[index], matrix[index])\n\n self.metadata['confusionMatrix'] = self._confusion_matrix\n\n\nclass SlicedClassificationMetrics(Artifact):\n \"\"\"Metrics class representing Sliced Classification Metrics.\n\n Similar to ClassificationMetrics clients using this class are expected to use\n log methods of the class to log metrics with the difference being each log\n method takes a slice to associate the ClassificationMetrics.\n\n \"\"\"\n\n TYPE_NAME = 'system.SlicedClassificationMetrics'\n\n def __init__(self,\n name: Optional[str] = None,\n uri: Optional[str] = None,\n metadata: Optional[Dict] = None):\n super().__init__(uri=uri, name=name, metadata=metadata)\n\n def _upsert_classification_metrics_for_slice(self, slice: str):\n \"\"\"Upserts the classification metrics instance for a slice.\"\"\"\n if slice not in self._sliced_metrics:\n self._sliced_metrics[slice] = ClassificationMetrics()\n\n def _update_metadata(self, slice: str):\n \"\"\"Updates metadata to adhere to the metrics schema.\"\"\"\n self.metadata = {}\n self.metadata['evaluationSlices'] = []\n for slice in self._sliced_metrics.keys():\n slice_metrics = {\n 'slice': slice,\n 'sliceClassificationMetrics': self._sliced_metrics[slice].metadata\n }\n self.metadata['evaluationSlices'].append(slice_metrics)\n\n def log_roc_reading(self, slice: str, threshold: float, tpr: float,\n fpr: float):\n \"\"\"Logs a single data point in the ROC Curve of a slice.\n\n Args:\n slice: String representing slice label.\n threshold: Thresold value for the data point.\n tpr: True positive rate value of the data point.\n fpr: False positive rate value of the data point.\n \"\"\"\n\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].log_roc_reading(threshold, tpr, fpr)\n self._update_metadata(slice)\n\n def load_roc_readings(self, slice: str, readings: List[List[float]]):\n \"\"\"Supports bulk loading ROC Curve readings for a slice.\n\n Args:\n slice: String representing slice label.\n readings: A 2-D list providing ROC Curve data points.\n The expected order of the data points is: threshold,\n true_positive_rate, false_positive_rate.\n \"\"\"\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].load_roc_readings(readings)\n self._update_metadata(slice)\n\n def set_confusion_matrix_categories(self, slice: str, categories: List[str]):\n \"\"\"Stores confusion matrix categories for a slice..\n\n Categories are stored in the internal metrics_utils.ConfusionMatrix\n instance of the slice.\n\n Args:\n slice: String representing slice label.\n categories: List of strings specifying the categories.\n \"\"\"\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].set_confusion_matrix_categories(categories)\n self._update_metadata(slice)\n\n def log_confusion_matrix_row(self, slice: str, row_category: str,\n row: List[int]):\n \"\"\"Logs a confusion matrix row for a slice.\n\n Row is updated on the internal metrics_utils.ConfusionMatrix\n instance of the slice.\n\n Args:\n slice: String representing slice label.\n row_category: Category to which the row belongs.\n row: List of integers specifying the values for the row.\n \"\"\"\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].log_confusion_matrix_row(row_category, row)\n self._update_metadata(slice)\n\n def log_confusion_matrix_cell(self, slice: str, row_category: str,\n col_category: str, value: int):\n \"\"\"Logs a confusion matrix cell for a slice..\n\n Cell is updated on the internal metrics_utils.ConfusionMatrix\n instance of the slice.\n\n Args:\n slice: String representing slice label.\n row_category: String representing the name of the row category.\n col_category: String representing the name of the column category.\n value: Int value of the cell.\n \"\"\"\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].log_confusion_matrix_cell(\n row_category, col_category, value)\n self._update_metadata(slice)\n\n def load_confusion_matrix(self, slice: str, categories: List[str],\n matrix: List[List[int]]):\n \"\"\"Supports bulk loading the whole confusion matrix for a slice.\n\n Args:\n slice: String representing slice label.\n categories: List of the category names.\n matrix: Complete confusion matrix.\n \"\"\"\n self._upsert_classification_metrics_for_slice(slice)\n self._sliced_metrics[slice].log_confusion_matrix_cell(categories, matrix)\n self._update_metadata(slice)\n\n\nT = TypeVar('T')\n\n\nclass InputAnnotation():\n \"\"\"Marker type for input artifacts.\"\"\"\n pass\n\n\n\nclass OutputAnnotation():\n \"\"\"Marker type for output artifacts.\"\"\"\n pass\n\n\n# TODO: Use typing.Annotated instead of this hack.\n# With typing.Annotated (Python 3.9+ or typing_extensions package), the\n# following would look like:\n# Input = typing.Annotated[T, InputAnnotation]\n# Output = typing.Annotated[T, OutputAnnotation]\n\n\n# Input represents an Input artifact of type T.\nInput = Union[T, InputAnnotation]\n\n# Output represents an Output artifact of type T.\nOutput = Union[T, OutputAnnotation]\n\n\ndef is_artifact_annotation(typ) -> bool:\n if hasattr(typ, '_subs_tree'): # Python 3.6\n subs_tree = typ._subs_tree()\n return len(subs_tree) == 3 and subs_tree[0] == Union and subs_tree[2] in [InputAnnotation, OutputAnnotation]\n\n if not hasattr(typ, '__origin__'):\n return False\n\n\n if typ.__origin__ != Union and type(typ.__origin__) != type(Union):\n return False\n\n\n if not hasattr(typ, '__args__') or len(typ.__args__) != 2:\n return False\n\n if typ.__args__[1] not in [InputAnnotation, OutputAnnotation]:\n return False\n\n return True\n\ndef is_input_artifact(typ) -> bool:\n \"\"\"Returns True if typ is of type Input[T].\"\"\"\n if not is_artifact_annotation(typ):\n return False\n\n if hasattr(typ, '_subs_tree'): # Python 3.6\n subs_tree = typ._subs_tree()\n return len(subs_tree) == 3 and subs_tree[2] == InputAnnotation\n\n return typ.__args__[1] == InputAnnotation\n\ndef is_output_artifact(typ) -> bool:\n \"\"\"Returns True if typ is of type Output[T].\"\"\"\n if not is_artifact_annotation(typ):\n return False\n\n if hasattr(typ, '_subs_tree'): # Python 3.6\n subs_tree = typ._subs_tree()\n return len(subs_tree) == 3 and subs_tree[2] == OutputAnnotation\n\n return typ.__args__[1] == OutputAnnotation\n\ndef get_io_artifact_class(typ):\n if not is_artifact_annotation(typ):\n return None\n if typ == Input or typ == Output:\n return None\n\n if hasattr(typ, '_subs_tree'): # Python 3.6\n subs_tree = typ._subs_tree()\n if len(subs_tree) != 3:\n return None\n return subs_tree[1]\n\n return typ.__args__[0]\n\ndef get_io_artifact_annotation(typ):\n if not is_artifact_annotation(typ):\n return None\n\n if hasattr(typ, '_subs_tree'): # Python 3.6\n subs_tree = typ._subs_tree()\n if len(subs_tree) != 3:\n return None\n return subs_tree[2]\n\n return typ.__args__[1]\n\n\n\n_SCHEMA_TITLE_TO_TYPE: Dict[str, Artifact] = {\n x.TYPE_NAME: x\n for x in [Artifact, Model, Dataset, Metrics, ClassificationMetrics]\n}\n\n\ndef create_runtime_artifact(runtime_artifact: Dict) -> Artifact:\n \"\"\"Creates an Artifact instance from the specified RuntimeArtifact.\n\n Args:\n runtime_artifact: Dictionary representing JSON-encoded RuntimeArtifact.\n \"\"\"\n schema_title = runtime_artifact.get('type', {}).get('schemaTitle', '')\n\n artifact_type = _SCHEMA_TITLE_TO_TYPE.get(schema_title)\n if not artifact_type:\n artifact_type = Artifact\n return artifact_type(\n uri=runtime_artifact.get('uri', ''),\n name=runtime_artifact.get('name', ''),\n metadata=runtime_artifact.get('metadata', {}),\n )\n\nclass InputPath:\n '''When creating component from function, :class:`.InputPath` should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.'''\n def __init__(self, type=None):\n self.type = type\n\nclass OutputPath:\n '''When creating component from function, :class:`.OutputPath` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.'''\n def __init__(self, type=None):\n self.type = type\n\nclass Executor():\n \"\"\"Executor executes v2-based Python function components.\"\"\"\n\n def __init__(self, executor_input: Dict, function_to_execute: Callable):\n self._func = function_to_execute\n self._input = executor_input\n self._input_artifacts: Dict[str, Artifact] = {}\n self._output_artifacts: Dict[str, Artifact] = {}\n\n for name, artifacts in self._input.get('inputs', {}).get('artifacts',\n {}).items():\n artifacts_list = artifacts.get('artifacts')\n if artifacts_list:\n self._input_artifacts[name] = self._make_input_artifact(\n artifacts_list[0])\n\n for name, artifacts in self._input.get('outputs', {}).get('artifacts',\n {}).items():\n artifacts_list = artifacts.get('artifacts')\n if artifacts_list:\n self._output_artifacts[name] = self._make_output_artifact(\n artifacts_list[0])\n\n self._return_annotation = inspect.signature(self._func).return_annotation\n self._executor_output = {}\n\n @classmethod\n def _make_input_artifact(cls, runtime_artifact: Dict):\n return create_runtime_artifact(runtime_artifact)\n\n @classmethod\n def _make_output_artifact(cls, runtime_artifact: Dict):\n import os\n artifact = create_runtime_artifact(runtime_artifact)\n os.makedirs(os.path.dirname(artifact.path), exist_ok=True)\n return artifact\n\n def _get_input_artifact(self, name: str):\n return self._input_artifacts.get(name)\n\n def _get_output_artifact(self, name: str):\n return self._output_artifacts.get(name)\n\n def _get_input_parameter_value(self, parameter_name: str, parameter_type: Any):\n parameter = self._input.get('inputs', {}).get('parameters',\n {}).get(parameter_name, None)\n if parameter is None:\n return None\n\n if parameter.get('stringValue'):\n if parameter_type == str:\n return parameter['stringValue']\n elif parameter_type == bool:\n # Use `.lower()` so it can also handle 'True' and 'False' (resulted from\n # `str(True)` and `str(False)`, respectively.\n return json.loads(parameter['stringValue'].lower())\n else:\n return json.loads(parameter['stringValue'])\n elif parameter.get('intValue'):\n return int(parameter['intValue'])\n elif parameter.get('doubleValue'):\n return float(parameter['doubleValue'])\n\n def _get_output_parameter_path(self, parameter_name: str):\n parameter_name = self._maybe_strip_path_suffix(parameter_name)\n parameter = self._input.get('outputs',\n {}).get('parameters',\n {}).get(parameter_name, None)\n if parameter is None:\n return None\n\n import os\n path = parameter.get('outputFile', None)\n if path:\n os.makedirs(os.path.dirname(path), exist_ok=True)\n return path\n\n def _get_output_artifact_path(self, artifact_name: str):\n artifact_name = self._maybe_strip_path_suffix(artifact_name)\n output_artifact = self._output_artifacts.get(artifact_name)\n if not output_artifact:\n raise ValueError(\n 'Failed to get output artifact path for artifact name {}'.format(\n artifact_name))\n return output_artifact.path\n\n def _get_input_artifact_path(self, artifact_name: str):\n artifact_name = self._maybe_strip_path_suffix(artifact_name)\n input_artifact = self._input_artifacts.get(artifact_name)\n if not input_artifact:\n raise ValueError(\n 'Failed to get input artifact path for artifact name {}'.format(\n artifact_name))\n return input_artifact.path\n\n def _write_output_parameter_value(self, name: str,\n value: Union[str, int, float, bool, dict,\n list, Dict, List]):\n if type(value) == str:\n output = {'stringValue': value}\n elif type(value) == int:\n output = {'intValue': value}\n elif type(value) == float:\n output = {'doubleValue': value}\n else:\n # For bool, list, dict, List, Dict, json serialize the value.\n output = {'stringValue': json.dumps(value)}\n\n if not self._executor_output.get('parameters'):\n self._executor_output['parameters'] = {}\n\n self._executor_output['parameters'][name] = output\n\n def _write_output_artifact_payload(self, name: str, value: Any):\n path = self._get_output_artifact_path(name)\n with open(path, 'w') as f:\n f.write(str(value))\n\n # TODO: extract to a util\n @classmethod\n def _get_short_type_name(cls, type_name: str) -> str:\n \"\"\"Extracts the short form type name.\n\n This method is used for looking up serializer for a given type.\n\n For example:\n typing.List -> List\n typing.List[int] -> List\n typing.Dict[str, str] -> Dict\n List -> List\n str -> str\n\n Args:\n type_name: The original type name.\n\n Returns:\n The short form type name or the original name if pattern doesn't match.\n \"\"\"\n import re\n match = re.match('(typing\\.)?(?P\\w+)(?:\\[.+\\])?', type_name)\n if match:\n return match.group('type')\n else:\n return type_name\n\n # TODO: merge with type_utils.is_parameter_type\n @classmethod\n def _is_parameter(cls, annotation: Any) -> bool:\n if type(annotation) == type:\n return annotation in [str, int, float, bool, dict, list]\n\n # Annotation could be, for instance `typing.Dict[str, str]`, etc.\n return cls._get_short_type_name(str(annotation)) in ['Dict', 'List']\n\n @classmethod\n def _is_artifact(cls, annotation: Any) -> bool:\n if type(annotation) == type:\n return issubclass(annotation, Artifact)\n return False\n\n @classmethod\n def _is_named_tuple(cls, annotation: Any) -> bool:\n if type(annotation) == type:\n return issubclass(annotation, tuple) and hasattr(\n annotation, '_fields') and hasattr(annotation, '__annotations__')\n return False\n\n def _handle_single_return_value(self, output_name: str, annotation_type: Any,\n return_value: Any):\n if self._is_parameter(annotation_type):\n if type(return_value) != annotation_type:\n raise ValueError(\n 'Function `{}` returned value of type {}; want type {}'.format(\n self._func.__name__, type(return_value), annotation_type))\n self._write_output_parameter_value(output_name, return_value)\n elif self._is_artifact(annotation_type):\n self._write_output_artifact_payload(output_name, return_value)\n else:\n raise RuntimeError(\n 'Unknown return type: {}. Must be one of `str`, `int`, `float`, or a'\n ' subclass of `Artifact`'.format(annotation_type))\n\n def _write_executor_output(self, func_output: Optional[Any] = None):\n if self._output_artifacts:\n self._executor_output['artifacts'] = {}\n\n for name, artifact in self._output_artifacts.items():\n runtime_artifact = {\n 'name': artifact.name,\n 'uri': artifact.uri,\n 'metadata': artifact.metadata,\n }\n artifacts_list = {'artifacts': [runtime_artifact]}\n\n self._executor_output['artifacts'][name] = artifacts_list\n\n if func_output is not None:\n if self._is_parameter(self._return_annotation) or self._is_artifact(\n self._return_annotation):\n # Note: single output is named `Output` in component.yaml.\n self._handle_single_return_value('Output', self._return_annotation,\n func_output)\n elif self._is_named_tuple(self._return_annotation):\n if len(self._return_annotation._fields) != len(func_output):\n raise RuntimeError(\n 'Expected {} return values from function `{}`, got {}'.format(\n len(self._return_annotation._fields), self._func.__name__,\n len(func_output)))\n for i in range(len(self._return_annotation._fields)):\n field = self._return_annotation._fields[i]\n field_type = self._return_annotation.__annotations__[field]\n if type(func_output) == tuple:\n field_value = func_output[i]\n else:\n field_value = getattr(func_output, field)\n self._handle_single_return_value(field, field_type, field_value)\n else:\n raise RuntimeError(\n 'Unknown return type: {}. Must be one of `str`, `int`, `float`, a'\n ' subclass of `Artifact`, or a NamedTuple collection of these types.'\n .format(self._return_annotation))\n\n import os\n os.makedirs(\n os.path.dirname(self._input['outputs']['outputFile']), exist_ok=True)\n with open(self._input['outputs']['outputFile'], 'w') as f:\n f.write(json.dumps(self._executor_output))\n\n def _maybe_strip_path_suffix(self, name) -> str:\n if name.endswith('_path'):\n name = name[0:-len('_path')]\n if name.endswith('_file'):\n name = name[0:-len('_file')]\n return name\n\n def execute(self):\n annotations = inspect.getfullargspec(self._func).annotations\n\n # Function arguments.\n func_kwargs = {}\n\n for k, v in annotations.items():\n if k == 'return':\n continue\n\n if self._is_parameter(v):\n func_kwargs[k] = self._get_input_parameter_value(k, v)\n\n if is_artifact_annotation(v):\n if is_input_artifact(v):\n func_kwargs[k] = self._get_input_artifact(k)\n if is_output_artifact(v):\n func_kwargs[k] = self._get_output_artifact(k)\n\n elif isinstance(v, OutputPath):\n if self._is_parameter(v.type):\n func_kwargs[k] = self._get_output_parameter_path(k)\n else:\n func_kwargs[k] = self._get_output_artifact_path(k)\n elif isinstance(v, InputPath):\n func_kwargs[k] = self._get_input_artifact_path(k)\n\n result = self._func(**func_kwargs)\n self._write_executor_output(result)\n\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n\ndef executor_main():\n import argparse\n import json\n\n parser = argparse.ArgumentParser(description='Process some integers.')\n parser.add_argument('--executor_input', type=str)\n parser.add_argument('--function_to_execute', type=str)\n\n args, _ = parser.parse_known_args()\n executor_input = json.loads(args.executor_input)\n function_to_execute = globals()[args.function_to_execute]\n\n executor = Executor(executor_input=executor_input,\n function_to_execute=function_to_execute)\n\n executor.execute()\n\n\nif __name__ == '__main__':\n executor_main()\n" ], "image": "python:3.7" } } } }, "pipelineInfo": { "name": "test" }, "root": { "dag": { "tasks": { "for-loop-1": { "componentRef": { "name": "comp-for-loop-1" }, "dependentTasks": [ "get-parallel-offsets" ], "inputs": { "artifacts": { "pipelineparam--get-parallel-offsets-offsets": { "taskOutputArtifact": { "outputArtifactKey": "offsets", "producerTask": "get-parallel-offsets" } } } }, "parameterIterator": { "itemInput": "pipelineparam--get-parallel-offsets-offsets-loop-item", "items": { "inputParameter": "pipelineparam--get-parallel-offsets-offsets" } }, "taskInfo": { "name": "for-loop-1" } }, "get-parallel-offsets": { "componentRef": { "name": "comp-get-parallel-offsets" }, "inputs": { "parameters": { "parallel_num": { "componentInputParameter": "parallel_num" }, "rows_count": { "runtimeValue": { "constantValue": { "intValue": "3000" } } } } }, "taskInfo": { "name": "get-parallel-offsets" } } } }, "inputDefinitions": { "parameters": { "parallel_num": { "type": "INT" } } } }, "schemaVersion": "2.0.0", "sdkVersion": "kfp-1.6.4" }, "runtimeConfig": { "parameters": { "parallel_num": { "intValue": "10" } } } } ```

Expected result

Successful validation of vertex pipelines.

Materials and Reference


Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

TrsNium commented 3 years ago
    "pipelineInfo": {
      "name": "test"
    },
    "root": {
      "dag": {
        "tasks": {
          "for-loop-1": {
            "componentRef": {
              "name": "comp-for-loop-1"
            },
            "dependentTasks": [
              "get-parallel-offsets"
            ],
            "inputs": {
              "artifacts": {
                "pipelineparam--get-parallel-offsets-offsets": {
                  "taskOutputArtifact": {
                    "outputArtifactKey": "offsets",
                    "producerTask": "get-parallel-offsets"
                  }
                }
              }
            },
            "parameterIterator": {
              "itemInput": "pipelineparam--get-parallel-offsets-offsets-loop-item",
              "items": {
                "inputParameter": "pipelineparam--get-parallel-offsets-offsets"
              }
            },
            "taskInfo": {
              "name": "for-loop-1"
            }
          },
          "get-parallel-offsets": {
            "componentRef": {
              "name": "comp-get-parallel-offsets"
            },
            "inputs": {
              "parameters": {
                "parallel_num": {
                  "componentInputParameter": "parallel_num"
                },
                "rows_count": {
                  "runtimeValue": {
                    "constantValue": {
                      "intValue": "3000"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "get-parallel-offsets"
            }
          }
        }
      },
      "inputDefinitions": {
        "parameters": {
          "parallel_num": {
            "type": "INT"
          }
        }
      }
    },
    "schemaVersion": "2.0.0",
    "sdkVersion": "kfp-1.6.3"
  },
  "runtimeConfig": {
    "parameters": {
      "parallel_num": {
        "intValue": "10"
      }
    }
  }
}

~I was looking at the compiled dsl and it looks like there is no dependency on print_op.~ ~This has nothing to do with the above error, but I think it might be a problem later as an error~

{
  "pipelineSpec": {
    "components": {
      "comp-for-loop-1": {
        "dag": {
          "tasks": {
            "print-op": {
              "componentRef": {
                "name": "comp-print-op"
              },
              "inputs": {
                "parameters": {
                  "message": {
                    "componentInputParameter": "pipelineparam--get-parallel-offsets-offsets-loop-item",
                    "parameterExpressionSelector": "parseJson(string_value)[\"upper_bounds\"]"
                  }
                }
              },
              "taskInfo": {
                "name": "print-op"
              }
            }
          }
        },
        "inputDefinitions": {
          "artifacts": {
            "pipelineparam--get-parallel-offsets-offsets": {
              "artifactType": {
                "schemaTitle": "system.Artifact"
              }
            }
          },
          "parameters": {
            "pipelineparam--get-parallel-offsets-offsets-loop-item": {
              "type": "STRING"
            }
          }
        }
      },

The above comment is my mistake. It was defined as a dag in the component.

zijianjoy commented 3 years ago

cc @chensun

TrsNium commented 3 years ago
from typing import NamedTuple

@component
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float)
  ]):
  """Example function that demonstrates how to return multiple values."""  
  sum_value = a + b
  product_value = a * b

  from collections import namedtuple
  example_output = namedtuple('ExampleOutputs', ['sum', 'product'])
  return example_output(sum_value, product_value)
      "comp-multiple-return-values-example": {
        "executorLabel": "exec-multiple-return-values-example",
        "inputDefinitions": {
          "parameters": {
            "a": {
              "type": "DOUBLE"
            },
            "b": {
              "type": "DOUBLE"
            }
          }
        },
        "outputDefinitions": {
          "parameters": {
            "product": {
              "type": "DOUBLE"
            },
            "sum": {
              "type": "DOUBLE"
            }
          }
        }
      },

When I used the example function, I found that the outputDefinitions field of the compiled json was set to parameters instead of artifact information.

And in v1 compiler, I notice that both artifacts and parameters are defined as outputs.

TrsNium commented 3 years ago

Could you please share any v2 compiler or compiled dsl specification to make it easier to contribute?

chensun commented 3 years ago

And in v1 compiler, I notice that both artifacts and parameters are defined as outputs.

Could you please share any v2 compiler or compiled dsl specification to make it easier to contribute?

Hi @TrsNium, thanks for opening this issue with all the details. In v2, we have the distinction between parameters and artifacts, and they have different behaviors. The decision of parameter vs. artifact is based on the user declared types. Here's our latest doc touching this topic: https://www.kubeflow.org/docs/components/pipelines/sdk/v2/v2-compatibility/#building-pipelines-using-the-kubeflow-pipelines-sdk-v2 In short, inputs/outputs typed as str, int, float, bool, dict, or list are treated as parameters, and everything else including untyped inputs/outputs are treated as artifacts. Apologize that our doc isn't super clear on this topic. We will have some additional docs available soon.

Back to your example, your output offsets seems to be an output parameter. Can you please try type it as dict or typing.Dict? I think that will probably solve this issue.

That being said, I realize that we probably have a bug here that we cannot define output artifact using NamedTuple return annotation, but compiler doesn't throw any error in such case. We'll try to fix that separately.

TrsNium commented 3 years ago

Hi @chensun typing.Dict and typing.List is not worked, so fix that in this PR https://github.com/kubeflow/pipelines/pull/5979 .

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

rimolive commented 7 months ago

Closing this issue. No activity for more than a year.

/close

google-oss-prow[bot] commented 7 months ago

@rimolive: Closing this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/5912#issuecomment-2016830363): >Closing this issue. No activity for more than a year. > >/close Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.