combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 312 forks source link

OneHotEncoder serialization failed #779

Open inardini opened 2 years ago

inardini commented 2 years ago

To who it may concern,

I'm using mleap-pyspark to serialize the following pipeline using pyspark 3.0.2 and mleap 0.18.1.

imputer --> string_indexer --> imputer --> string_indexer --> one_hot_encoder --> vector_assembler --> scaler --> vector_assembler --> random_classifier

But I get this error:

Py4JJavaError: An error occurred while calling o99817.serializeToBundle. : java.lang.RuntimeException: unsupported attribute for field loan_term_idx_imputed at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$.sizeForField(OneHotEncoderOp.scala:31) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2$adapted(OneHotEncoderOp.scala:47) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:37) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$writeNode$1(GraphSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$write$2(GraphSerializer.scala:21) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.BundleSerializer.$anonfun$write$1(BundleSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29) at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:34) at ml.combust.mleap.spark.SimpleSparkSerializer.$anonfun$serializeToBundleWithFormat$4(SimpleSparkSerializer.scala:26) at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) at scala.util.control.Exception$Catch.apply(Exception.scala:228) at scala.util.control.Exception$Catch.either(Exception.scala:252) at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33) at scala.util.Try$.apply(Try.scala:213) at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundleWithFormat(SimpleSparkSerializer.scala:25) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundle(SimpleSparkSerializer.scala:17) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

Any insights?

jsleight commented 2 years ago

From looking at the source code, this error is caused by the one hot encoder op not being able to resolve the categorical size of the loan_term_idx_imputed field. It expects the field to be either nominal or binary and I think it is getting an "unresolved" type right now.

Few followup questions:

inardini commented 2 years ago

Thanks for feedback.

Point by Point:

one_hot_encoder = OneHotEncoder(dropLast=False, inputCols=categorical_imputer.getOutputCols(), outputCols=ONE_HOT_ENCODED_FEATURES, handleInvalid='keep')

Hope it helps

jsleight commented 2 years ago

Thanks for the requested info.

So what is happening is that the loan_term_idx field is considered a "nominal" attribute, but loan_term_idx_imputed has reset that attribute status so it is now "unresolved". MLeap's one hot encoder op requires that the transformer have a fixed size of state, which it infers from the vals.size in the metadata for each of inputCols.

Since MLeap is on Spark v3 now though, we could instead look at the new OneHotEncoderModel.categorySizes property instead of inferring things from metadata. We need to change this line

So there are two paths forward for you:

  1. Wait for this fix to go though. If you're up for it, I'm happy to review and merge a PR 😄 , then you'd need to wait for the next release or use a snapshot.
  2. Alter your pipeline. I think you can either put the imputer before the stringindexer or can maybe even remove the imputer all together and let the one encoder's keep invalid handle the values which would otherwise be imputed.

Cheers!