Closed gustavosr98 closed 1 year ago
Hey @gustavosr98,
Is this using one of our demo pipelines, or one you're modifying? If you can point us to the pipeline that'll help with reproduction steps. Thanks!
@ca-scribner sure, I left the example name on the title but here is a direct link to the folder https://github.com/canonical/kubeflow-examples/tree/main/e2e-wine-kfp-mlflow
Here is the generated pipeline.yaml
following the readme instructions
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: e2e-wine-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12, pipelines.kubeflow.org/pipeline_compilation_time: '2023-07-07T00:38:27.883407',
pipelines.kubeflow.org/pipeline_spec: '{"description": "WINE pipeline", "inputs":
[{"name": "url"}], "name": "e2e_wine_pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12}
spec:
entrypoint: e2e-wine-pipeline
templates:
- name: deploy-model
container:
args: []
command: [python3, src/deploy.py, --model_uri, '{{inputs.parameters.train-Output}}']
image: bponieckiklotz/kubeflow-e2e-seldon-mlflow-deploy-step@sha256:8cd5fc4c7aa52b9aebc71cd841e531e638d1dc9b58ddd59c4d6c78b97ee035b2
inputs:
parameters:
- {name: train-Output}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "deploy
model using seldon core", "implementation": {"container": {"command": ["python3",
"src/deploy.py", "--model_uri", {"inputValue": "model_uri"}], "image": "bponieckiklotz/kubeflow-e2e-seldon-mlflow-deploy-step@sha256:8cd5fc4c7aa52b9aebc71cd841e531e638d1dc9b58ddd59c4d6c78b97ee035b2"}},
"inputs": [{"description": "MLFlow model artifact URI", "name": "model_uri",
"type": "String"}], "name": "Deploy model"}', pipelines.kubeflow.org/component_ref: '{"digest":
"effb2cbf3cb6e2a328ea2ae64074968a0faaaa31532c2f46dc845ea4721f0d35", "url":
"/home/ubuntu/kubeflow-examples/e2e-wine-kfp-mlflow/components/deploy/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"model_uri": "{{inputs.parameters.train-Output}}"}'}
- name: download-data
container:
args: []
command:
- sh
- -exc
- |
url="$0"
output_path="$1"
curl_options="$2"
mkdir -p "$(dirname "$output_path")"
curl --get "$url" --output "$output_path" $curl_options
- '{{inputs.parameters.url}}'
- /tmp/outputs/Data/data
- --location
image: byrnedo/alpine-curl@sha256:548379d0a4a0c08b9e55d9d87a592b7d35d9ab3037f4936f5ccd09d0b625a342
inputs:
parameters:
- {name: url}
outputs:
artifacts:
- {name: download-data-Data, path: /tmp/outputs/Data/data}
metadata:
annotations: {author: Alexey Volkov <alexey.volkov@ark-kun.com>, canonical_location: 'https://raw.githubusercontent.com/Ark-kun/pipeline_components/master/components/web/Download/component.yaml',
pipelines.kubeflow.org/component_spec: '{"implementation": {"container": {"command":
["sh", "-exc", "url=\"$0\"\noutput_path=\"$1\"\ncurl_options=\"$2\"\n\nmkdir
-p \"$(dirname \"$output_path\")\"\ncurl --get \"$url\" --output \"$output_path\"
$curl_options\n", {"inputValue": "Url"}, {"outputPath": "Data"}, {"inputValue":
"curl options"}], "image": "byrnedo/alpine-curl@sha256:548379d0a4a0c08b9e55d9d87a592b7d35d9ab3037f4936f5ccd09d0b625a342"}},
"inputs": [{"name": "Url", "type": "URI"}, {"default": "--location", "description":
"Additional options given to the curl bprogram. See https://curl.haxx.se/docs/manpage.html",
"name": "curl options", "type": "string"}], "metadata": {"annotations":
{"author": "Alexey Volkov <alexey.volkov@ark-kun.com>", "canonical_location":
"https://raw.githubusercontent.com/Ark-kun/pipeline_components/master/components/web/Download/component.yaml"}},
"name": "Download data", "outputs": [{"name": "Data"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"2f61f2edf713f214934bd286791877a1a3a37f31a4de4368b90e3b76743f1523", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Url": "{{inputs.parameters.url}}",
"curl options": "--location"}'}
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: e2e-wine-pipeline
inputs:
parameters:
- {name: url}
dag:
tasks:
- name: deploy-model
template: deploy-model
dependencies: [train]
arguments:
parameters:
- {name: train-Output, value: '{{tasks.train.outputs.parameters.train-Output}}'}
- name: download-data
template: download-data
arguments:
parameters:
- {name: url, value: '{{inputs.parameters.url}}'}
- name: preprocess
template: preprocess
dependencies: [download-data]
arguments:
artifacts:
- {name: download-data-Data, from: '{{tasks.download-data.outputs.artifacts.download-data-Data}}'}
- name: train
template: train
dependencies: [preprocess]
arguments:
artifacts:
- {name: preprocess-output, from: '{{tasks.preprocess.outputs.artifacts.preprocess-output}}'}
- name: preprocess
container:
args: [--file, /tmp/inputs/file/data, --output, /tmp/outputs/output/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pandas' 'pyarrow' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'pandas' 'pyarrow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def preprocess(file_path,
output_file):
import pandas as pd
df = pd.read_csv(file_path, header=0, sep=";")
df.columns = [c.lower().replace(' ', '_') for c in df.columns]
df.to_parquet(output_file)
import argparse
_parser = argparse.ArgumentParser(prog='Preprocess', description='')
_parser.add_argument("--file", dest="file_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output", dest="output_file", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = preprocess(**_parsed_args)
image: python:3.9
inputs:
artifacts:
- {name: download-data-Data, path: /tmp/inputs/file/data}
outputs:
artifacts:
- {name: preprocess-output, path: /tmp/outputs/output/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--file", {"inputPath": "file"}, "--output", {"outputPath": "output"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''pandas'' ''pyarrow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''pandas'' ''pyarrow''
--user) && \"$0\" \"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def _make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path),
exist_ok=True)\n return file_path\n\ndef preprocess(file_path,\n output_file):\n import
pandas as pd\n df = pd.read_csv(file_path, header=0, sep=\";\")\n df.columns
= [c.lower().replace('' '', ''_'') for c in df.columns]\n df.to_parquet(output_file)\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Preprocess'', description='''')\n_parser.add_argument(\"--file\",
dest=\"file_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output\",
dest=\"output_file\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= preprocess(**_parsed_args)\n"], "image": "python:3.9"}}, "inputs": [{"name":
"file", "type": "CSV"}], "name": "Preprocess", "outputs": [{"name": "output",
"type": "parquet"}]}', pipelines.kubeflow.org/component_ref: '{}'}
- name: train
container:
args: [--file, /tmp/inputs/file/data, '----output-paths', /tmp/outputs/Output/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pandas' 'pyarrow' 'sklearn' 'mlflow' 'boto3' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'pandas' 'pyarrow'
'sklearn' 'mlflow' 'boto3' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def train(file_path):
import pandas as pd
from sklearn.model_selection import train_test_split
import mlflow
from sklearn.linear_model import ElasticNet
df = pd.read_parquet(file_path)
target_column = 'quality'
train_x, test_x, train_y, test_y = train_test_split(
df.drop(columns=[target_column]),
df[target_column], test_size=.25,
random_state=1337, stratify=df[target_column])
with mlflow.start_run(run_name='elastic_net_models'):
alpha = 0.5
l1_ratio = 0.5
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)
result = mlflow.sklearn.log_model(lr, "model",
registered_model_name="wine-elasticnet")
return f"{mlflow.get_artifact_uri()}/{result.artifact_path}"
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(
str(str_value), str(type(str_value))))
return str_value
import argparse
_parser = argparse.ArgumentParser(prog='Train', description='')
_parser.add_argument("--file", dest="file_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = train(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
env:
- {name: MLFLOW_TRACKING_URI, value: 'http://mlflow-server.kubeflow.svc.cluster.local:5000'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: 'http://minio.kubeflow.svc.cluster.local:9000'}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef: {key: accesskey, name: mlpipeline-minio-artifact}
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef: {key: secretkey, name: mlpipeline-minio-artifact}
image: python:3.9
inputs:
artifacts:
- {name: preprocess-output, path: /tmp/inputs/file/data}
outputs:
parameters:
- name: train-Output
valueFrom: {path: /tmp/outputs/Output/data}
artifacts:
- {name: train-Output, path: /tmp/outputs/Output/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--file", {"inputPath": "file"}, "----output-paths", {"outputPath":
"Output"}], "command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3
-m pip install --quiet --no-warn-script-location ''pandas'' ''pyarrow''
''sklearn'' ''mlflow'' ''boto3'' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3
-m pip install --quiet --no-warn-script-location ''pandas'' ''pyarrow''
''sklearn'' ''mlflow'' ''boto3'' --user) && \"$0\" \"$@\"", "sh", "-ec",
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def train(file_path):\n import pandas
as pd\n from sklearn.model_selection import train_test_split\n import
mlflow\n from sklearn.linear_model import ElasticNet\n\n df = pd.read_parquet(file_path)\n\n target_column
= ''quality''\n train_x, test_x, train_y, test_y = train_test_split(\n df.drop(columns=[target_column]),\n df[target_column],
test_size=.25,\n random_state=1337, stratify=df[target_column])\n\n with
mlflow.start_run(run_name=''elastic_net_models''):\n alpha = 0.5\n l1_ratio
= 0.5\n lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)\n lr.fit(train_x,
train_y)\n result = mlflow.sklearn.log_model(lr, \"model\",\n registered_model_name=\"wine-elasticnet\")\n return
f\"{mlflow.get_artifact_uri()}/{result.artifact_path}\"\n\ndef _serialize_str(str_value:
str) -> str:\n if not isinstance(str_value, str):\n raise TypeError(''Value
\"{}\" has type \"{}\" instead of str.''.format(\n str(str_value),
str(type(str_value))))\n return str_value\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Train'', description='''')\n_parser.add_argument(\"--file\",
dest=\"file_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = train(**_parsed_args)\n\n_outputs
= [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.9"}}, "inputs": [{"name": "file", "type": "parquet"}],
"name": "Train", "outputs": [{"name": "Output", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}'}
arguments:
parameters:
- {name: url}
serviceAccountName: pipeline-runner
Could you try to rerun it? This sometimes happens in Argo/KFP when events are handled in incorrect order.
Synced with @gustavosr98 on this one, it seems like the issue is not relevant anymore. Feel free to re-open if you run into this again.
On second step of the run I get the following error
Taking a look at the logs: