Closed ftrifoglio closed 4 months ago
Hi ftrifoglio,
Thank you for your feedback. I think it might because that ext_modules
may not function as intended when dealing with packages, as it typically only includes what in the __init__.py
file rather than all necessary modules. In your scenario, I recommend utilizing the code_paths
argument instead. This allows you to specify the path to the folder containing the custom code you wish to import. Please give this approach a try and let us know if you encounter any difficulties. We'll also make sure to update our documentation to provide further clarity on the usage of ext_modules
.
Thank you @sfc-gh-wzhao!! Makes sense.
I've tried using code_paths
but I get the same error.
So I've done another test. I had a feeling that the fact that the reference to the function in my_module
is in the serialized pipeline object model/preproc_pipe.joblib.gz
, that might be the issue.
I got rid of that and added the import within the CustomModel subclass. That works.
X, y = make_classification()
X = pd.DataFrame(X, columns=["X" + str(i) for i in range(20)])
# log_trans = Pipeline(
# [
# ("impute", SimpleImputer()),
# ("scaler", MinMaxScaler()),
# (
# "logger",
# FunctionTransformer(
# np.log1p,
# feature_names_out=partial(column_labeller, "LOG"),
# ),
# ),
# ]
# )
# preproc_pipe = ColumnTransformer(
# [("log", log_trans, ["X0", "X1"])],
# remainder="passthrough",
# verbose_feature_names_out=False,
# )
# preproc_pipe.set_output(transform="pandas")
# preproc_pipe.fit(X, y)
# joblib.dump(preproc_pipe, "model/preproc_pipe.joblib.gz")
# # ['model/preproc_pipe.joblib.gz']
# xgb_data = xgb.DMatrix(preproc_pipe.transform(X), y)
xgb_data = xgb.DMatrix(X, y)
booster = xgb.train(dict(max_depth=5), xgb_data, num_boost_round=10)
joblib.dump(booster, "model/booster.joblib.gz")
# ['model/booster.joblib.gz']
class MyModel(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = joblib.load(self.context.path("model"))
# self.pipeline = joblib.load(self.context.path("pipeline"))
@custom_model.inference_api
def predict(self, X: pd.DataFrame) -> pd.DataFrame:
from my_module.utils import column_labeller
X = X.copy()
# xgb_data = xgb.DMatrix(self.pipeline.transform(X))
xgb_data = xgb.DMatrix(X)
preds = self.model.predict(xgb_data)
res_df = pd.DataFrame({"output": preds})
return res_df
model_signature = ModelSignature(
inputs=[FeatureSpec(dtype=DataType.FLOAT, name=f"X{i}") for i in range(20)],
outputs=[FeatureSpec(dtype=DataType.FLOAT, name="output")],
)
my_model = MyModel(
custom_model.ModelContext(
models={},
artifacts={
"model": "model/booster.joblib.gz",
# "pipeline": "model/preproc_pipe.joblib.gz",
},
)
)
print(my_model.predict(X))
# output
# 0 0.968972
# 1 0.016913
# 2 0.956805
# 3 0.016913
# 4 0.016913
# .. ...
# 95 0.984613
# 96 0.986547
# 97 0.102893
# 98 0.009444
# 99 0.016913
# [100 rows x 1 columns]
registry = Registry(session=session)
registry.log_model(
my_model,
model_name="MyModel",
version_name="v1",
python_version="3.11",
conda_dependencies=["scikit-learn", "pandas", "xgboost"],
signatures={"predict": model_signature},
code_paths=["my_module"]
)
# <snowflake.ml.model._client.model.model_version_impl.ModelVersion at 0x2c0579d50>
This works, but my actual my_module
contains custom scikit-learn transformers, so this workaround doesn't apply to my use case.
Is it possible that the serialized pipeline object is evaluated before or in a different environment where my_module
doesn't exist or doesn't exist yet, causing the ModuleNotFound
error?
Hi ftrifoglio,
If your actual use-case is similar to what you showed here which is a combination of scikit-learn transformer and an xgboost booster, you could use the model_ref
in context so that you don't need to handle a lot of stuff including dumps and load, and this might help you resolve the issue. Here is an example.
preproc_pipe = ...
booster_model = ....
class MyModel(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
@custom_model.inference_api
def predict(self, X: pd.DataFrame) -> pd.DataFrame:
xgb_data = xgb.DMatrix(self.context.model_ref("pipeline").transform(X))
preds = self.context.model_ref("model").predict(xgb_data)
res_df = pd.DataFrame({"output": preds})
return res_df
my_model = MyModel(
custom_model.ModelContext(
models={
"pipeline": preproc_pipe,
"model": booster_model,
},
artifacts={},
)
)
registry = Registry(session=session)
registry.log_model(
my_model,
model_name="MyModel",
version_name="v1",
python_version="3.11",
signatures={"predict": model_signature},
code_paths=["my_module"]
)
Thanks @sfc-gh-wzhao! so helpful.
but it turns out you also need ext_modules
. code_paths
alone will still raise the ModuleNotFound error.
I suppose that's not the intended workflow, else you would have pointed that out.
Let me know if there are other test you'd like me to run. Happy to help.
from functools import partial
from importlib import import_module
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.compose import ColumnTransformer
from sklearn.datasets import make_classification
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, MinMaxScaler
from snowflake import snowpark
from snowflake.ml.model import custom_model
from snowflake.ml.model.model_signature import DataType, FeatureSpec, ModelSignature
from snowflake.ml.registry import Registry
from snowflake.ml.version import VERSION
print(VERSION)
# 1.2.1
from my_module.utils import column_labeller
# # my_module/__init__.py
# from my_module import utils
#
# # my_module/utils.py
# def column_labeller(suffix, self, columns):
# return [suffix + "_" + c for c in columns]
#
connection_parameters = {
"account": ***************,
"user": ***************,,
"password": ***************,,
"role": ***************,
"warehouse": ***************,,
"database": ***************,,
"schema": ***************,,
}
session = snowpark.Session.builder.configs(connection_parameters).create()
X, y = make_classification()
X = pd.DataFrame(X, columns=["X" + str(i) for i in range(20)])
log_trans = Pipeline(
[
("impute", SimpleImputer()),
("scaler", MinMaxScaler()),
(
"logger",
FunctionTransformer(
np.log1p,
feature_names_out=partial(column_labeller, "LOG"),
),
),
]
)
preproc_pipe = ColumnTransformer(
[("log", log_trans, ["X0", "X1"])],
remainder="passthrough",
verbose_feature_names_out=False,
)
preproc_pipe.set_output(transform="pandas")
preproc_pipe.fit(X, y)
joblib.dump(preproc_pipe, "model/preproc_pipe.joblib.gz")
# ['model/preproc_pipe.joblib.gz']
xgb_data = xgb.DMatrix(preproc_pipe.transform(X), y)
booster = xgb.train(dict(max_depth=5), xgb_data, num_boost_round=10)
joblib.dump(booster, "model/booster.joblib.gz")
# ['model/booster.joblib.gz']
class MyModel(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
@custom_model.inference_api
def predict(self, X: pd.DataFrame) -> pd.DataFrame:
xgb_data = xgb.DMatrix(self.context.model_ref("pipeline").transform(X))
preds = self.context.model_ref("model").predict(xgb_data)
res_df = pd.DataFrame({"output": preds})
return res_df
model = joblib.load("model/booster.joblib.gz")
pipeline = joblib.load("model/preproc_pipe.joblib.gz")
my_model = MyModel(
custom_model.ModelContext(
models={
"pipeline": preproc_pipe,
"model": booster,
},
artifacts={},
)
)
model_signature = ModelSignature(
inputs=[FeatureSpec(dtype=DataType.FLOAT, name=f"X{i}") for i in range(20)],
outputs=[FeatureSpec(dtype=DataType.FLOAT, name="output")],
)
my_module = import_module("my_module")
registry = Registry(session=session)
registry.log_model(
my_model,
model_name="MyModel",
version_name="v1",
python_version="3.11",
signatures={"predict": model_signature},
conda_dependencies=["scikit-learn==1.3.0", "pandas", "xgboost"],
ext_modules=[my_module],
code_paths=["my_module"],
)
mv = registry.get_model("MYMODEL").version("V1")
print(mv.run(X, function_name="predict"))
# output
# 0 0.968972
# 1 0.016913
# 2 0.956805
# 3 0.016913
# 4 0.016913
# .. ...
# 95 0.984613
# 96 0.986547
# 97 0.102893
# 98 0.009444
# 99 0.016913
# [100 rows x 1 columns]
Hi ftrifoglio,
Thank you for your patience, and sorry that I made a mistake in the previous example. I think if you use ext_modules
in your latest example, without specifying code_paths
, it should work also work now. However, it is not expected that specifying via code_paths
is not working, which we have investigated and found a bug that prevent some modules included in the code_paths from being found by Python. We will fix this issue in the following release. Thank you for your feedback and if you have any other issues, please comment or open another issue.
Hi ftrifoglio,
We have implemented the fix and it is included in the just released version 1.3.0. Please take a try and see if that fixes your issue. I am closing this issue and if you believe that the issue still exists, please re-open it. Thank you!
I want to log a
CustomModel
that requires a custom module.Here's a reproducible example of what I'm doing but it seems the module cannot be found.