Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.64k stars 2.84k forks source link

[Azure ML CLI/SDK v2] Datastore ID for custom model registered via pipeline output is wrong #38406

Closed trancenoid closed 2 weeks ago

trancenoid commented 3 weeks ago

Describe the bug I am registering the model using the following component and pipeline script : Component

from mldesigner import command_component, Input, Output
import pandas as pd
import json
import os
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
import mlflow

@command_component(
    name="nyctaxi_trainxgb",
    version="1",
    display_name="Train XGB Regressor",
    description="Train XGB Regressor for Fare prediction",
    environment="azureml:DDS-MarketingLeadPrediction-Deploy-Env:7",
)
def nyc_train_xgb(
    train_data: Input(type="uri_folder"), #type: ignore
    test_data: Input(type="uri_folder"), #type: ignore
    Model: Output(type="mlflow_model"), #type: ignore
):
    # Enable MLflow autologging
    mlflow.start_run()
    mlflow.sklearn.autolog()
    mlflow.set_tag("model-name", "XGB Regressor")

    # Load training data
    train_X = pd.read_csv(os.path.join(train_data, 'X.csv'))
    train_y = pd.read_csv(os.path.join(train_data, 'y.csv'))
    train_metadata = json.load(open(os.path.join(train_data, 'train_metadata.json'), 'r'))

    # Load test data
    test_X = pd.read_csv(os.path.join(test_data, 'X.csv'))
    test_y = pd.read_csv(os.path.join(test_data, 'y.csv'))
    test_metadata = json.load(open(os.path.join(test_data, 'test_metadata.json'), 'r'))

    # Train XGB Model
    model = XGBRegressor().fit(train_X, train_y)
    mlflow.log_metric("training_mse", mean_squared_error(train_y, model.predict(train_X)))

    # Make predictions and calculate score
    predictions = model.predict(test_X)
    score = mean_squared_error(test_y, predictions)
    mlflow.log_metric("validation mse", score)

    # Save model using MLflow
    mlflow.sklearn.save_model(model, Model)

    # Save tags
    metadata = {"tags" : { "date range" : train_metadata['tags']["date_range"], "name" : "XGB Regressor"},
                "description" : "XGB Regressor trained on NYC Taxi data"}

    with open(os.path.join(Model, 'metadata.json'), 'w') as f:
        json.dump(metadata, f)

Pipeline :

import os
import sys

sys.path.append(os.path.join(os.getcwd(), "ml"))

from src.components.XGB_train import *

from azure.ai.ml import MLClient, Input, Output, dsl
from azure.ai.ml.constants import AssetTypes
from azure.identity import DefaultAzureCredential
import time
import json

# Initialize MLClient
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential, path='config.json')

def get_latest_version(model_name):
    try:
        latest_version = ml_client.models.get(name=model_name, label="latest")
        return latest_version.version
    except Exception as e:
        print(f"[Ignore] {e}")
        return None

@dsl.pipeline(
    description="NYC Taxi XGB Training Pipeline",
    compute="cpu-cluster"
)
def xgb_training_pipeline(
    train_data: Input,
    test_data: Input
):
    # Train XGB model
    train_step = nyc_train_xgb(
        train_data=train_data,
        test_data=test_data
    )

    return {
        "nyctaxi_xgb_model": train_step.outputs.Model
    }

# Get input datasets
VERSION = '1'  # adjust as needed
train_data = ml_client.data.get("NYCTAXI-MERGED_train-data", version=VERSION)
test_data = ml_client.data.get("NYCTAXI-MERGED_test-data", version=VERSION)

# Create pipeline
pipeline = xgb_training_pipeline(
    train_data=Input(type=AssetTypes.URI_FOLDER, path=train_data.path),
    test_data=Input(type=AssetTypes.URI_FOLDER, path=test_data.path)
)

# Get next model version
model_name = "NYCTAXI-XGB-model"
latest_version = get_latest_version(model_name)
new_version = '1' if latest_version is None else str(int(latest_version) + 1)

pipeline.outputs.nyctaxi_xgb_model.name = model_name
pipeline.outputs.nyctaxi_xgb_model.version = new_version

# Submit pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="nyc-taxi-xgb-training"
)

# Wait for pipeline completion
ml_client.jobs.stream(pipeline_job.name)

after this using ml_client.models.get('NYCTAXI-XGB-model', version = '2').path returns path to "workspaceartifactstore", but on browsing to the location there are only logs, the actual model files are in "workspaceblobstore"

To Reproduce Create the two files as above and run the pipeline.

Expected behavior The path must indicate the correct datastore, one that would work with fsspec

Screenshots If applicable, add screenshots to help explain your problem.

Additional context I need to read the JSON file I wrote in the component back again to tag the model asset. For conformity with data asset tagging #38205 I don't want to do this via mlflow even if possible.

github-actions[bot] commented 3 weeks ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @Azure/azure-ml-sdk @azureml-github.

catalinaperalta commented 3 weeks ago

Thanks for reaching out @trancenoid! Tagging the right team to help @azureml-github

achauhan-scc commented 2 weeks ago

It's a known and a repair item is in our backlog but there is no ETA yet. cc: @andscho-msft , @trangevi Suggested workaround the workaround would be to not use a named output, and instead register the model as a separate step, pointing to an asset which is in workspaceartifactstore.