h2oai / sparkling-water

Sparkling Water provides H2O functionality inside Spark cluster
https://docs.h2o.ai/sparkling-water/3.3/latest-stable/doc/index.html
Apache License 2.0
968 stars 360 forks source link

pysparkling fails to score unseen levels #5737

Open hutch3232 opened 4 months ago

hutch3232 commented 4 months ago

Apologies it's in two languages.

First I built an extremely basic GBM in R and saved out the mojo to S3:

library(data.table)
library(h2o)

h2o.init()

mt <- as.h2o(mtcars)

mt$engine_size <- h2o.ifelse(mt$cyl == 4, "small",
                             h2o.ifelse(mt$cyl == 6, "medium", "large"))

mod <- h2o.gbm(
  x = "engine_size",
  y = "mpg",
  training_frame = mt,
  ntrees = 1,
  max_depth = 3,
  learn_rate = 0.25,
  distribution = "gaussian",
  seed = 123
)

test <- as.h2o(
  data.frame(engine_size = c("small", "medium", "large", "extra-large"))
)

test$predict <- h2o.predict(object = mod,
                            newdata = test)
# Warning message:
# In doTryCatch(return(expr, name, parentenv, handler) : 
#     Test/Validation dataset column 'engine_size' has levels not trained on: ["extra-large"]
test
#   engine_size  predict
# 1       small 21.73388
# 2      medium 19.22987
# 3       large 19.22987
# 4 extra-large 19.22987
#
# [4 rows x 2 columns]

h2o.save_mojo(object = mod, path = "s3://<my bucket>/<my prefix>")

Then I spin up a Spark cluster, read in my mojo with pysparkling and make a SDF with a level unseen during training. Attempting to score throws errors about UDFs as well as unseen categorical levels.

import os
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysparkling.ml import H2OMOJOModel

conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.access.key', "<my access key>")
conf.set('spark.hadoop.fs.s3a.secret.key', "<my secret key>")

conf.set("spark.sql.view.maxNestedViewDepth", "1000")
conf.set("spark.sql.legacy.storeAnalyzedPlanForView", "True")
conf.set("spark.dynamicAllocation.enabled", True)

h2o_jvm_args = " ".join(
    [
        "-Dsys.ai.h2o.persist.s3.enable.path.style=true",
        "-Dsys.ai.h2o.persist.s3.maxErrorRetry=10",
        "-Dsys.ai.h2o.persist.s3.socketTimeout=100000",
        "-Dsys.ai.h2o.persist.s3.connectionTimeout=20000",
        "-Dsys.ai.h2o.persist.s3.maxHttpConnections=50",
    ]
)
conf.set("spark.executor.extraJavaOptions", h2o_jvm_args)
conf.set("spark.driver.extraJavaOptions", h2o_jvm_args)
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

model = H2OMOJOModel.createFromMojo("s3a://<my bucket>/<my prefix>/GBM_model_R_1721319539433_5.zip")

import pandas as pd
engine_size = ["small", "medium", "large", "extra-large"]

# Convert R vector to pandas DataFrame
df_pandas = pd.DataFrame({"engine_size": engine_size})

# Convert pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(df_pandas)

preds = model.transform(df)
preds.select("prediction").show()

Traceback from Spark:

Note especially:

Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large)
``` Py4JJavaError: An error occurred while calling o229.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6) (10.42.244.248 executor 1): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`H2OMOJOPredictionRegression$$Lambda$3188/0x0000000841557840`: (struct, double) => struct). at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198) at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large) at hex.genmodel.easy.RowToRawDataConverter.convertValue(RowToRawDataConverter.java:97) at hex.genmodel.easy.RowToRawDataConverter.convert(RowToRawDataConverter.java:55) at hex.genmodel.easy.EasyPredictModelWrapper.fillRawData(EasyPredictModelWrapper.java:1045) at hex.genmodel.easy.EasyPredictModelWrapper.predict(EasyPredictModelWrapper.java:1050) at hex.genmodel.easy.EasyPredictModelWrapper.preamble(EasyPredictModelWrapper.java:1041) at hex.genmodel.easy.EasyPredictModelWrapper.predictRegression(EasyPredictModelWrapper.java:884) at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1(H2OMOJOPredictionRegression.scala:44) at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1$adapted(H2OMOJOPredictionRegression.scala:42) ... 20 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332) at org.apache.spark.sql.Dataset.head(Dataset.scala:3326) at org.apache.spark.sql.Dataset.take(Dataset.scala:3549) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280) at org.apache.spark.sql.Dataset.showString(Dataset.scala:315) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`H2OMOJOPredictionRegression$$Lambda$3188/0x0000000841557840`: (struct, double) => struct). at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198) at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large) at hex.genmodel.easy.RowToRawDataConverter.convertValue(RowToRawDataConverter.java:97) at hex.genmodel.easy.RowToRawDataConverter.convert(RowToRawDataConverter.java:55) at hex.genmodel.easy.EasyPredictModelWrapper.fillRawData(EasyPredictModelWrapper.java:1045) at hex.genmodel.easy.EasyPredictModelWrapper.predict(EasyPredictModelWrapper.java:1050) at hex.genmodel.easy.EasyPredictModelWrapper.preamble(EasyPredictModelWrapper.java:1041) at hex.genmodel.easy.EasyPredictModelWrapper.predictRegression(EasyPredictModelWrapper.java:884) at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1(H2OMOJOPredictionRegression.scala:44) at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1$adapted(H2OMOJOPredictionRegression.scala:42) ```

I think that "extra-large" should be scored following the most populous path, per: https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/gbm-faq/missing_values.html