combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 310 forks source link

How to use mleap runtime #303

Closed colingoldberg closed 3 years ago

colingoldberg commented 6 years ago

In my view as a newcomer to Spark etc., the MLeap documentation is currently inadequate in describing what is needed to prepare for sending requests - once the zip bundle has been posted to the server.

I have successfully trained a (naive bayes) model on a (small) data file containing categorical columns, saved it using serializeToBundle, and posted the bundle to the server. For a request to be sent (say, using curl as per your example), surely the features vector needs to be submitted in the json request! So where and how is this formulated?

In your airbnb example, the json request includes string-valued columns - and no features vector. How so? I find this confusing.

Can you point me to step-by-step help to get a test going? What's the best way to do this?

Colin Goldberg

hollinwilkins commented 6 years ago

@colingoldberg The entire ML pipeline is serialized as part of the MLeap bundle, this include generation of the feature vector. The airbnb pipeline goes from simple double/string inputs, performs scaling, one hot encoding, and vector assembly before sending it into a random forest or linear regression. This is why you don't see an actual vector as input. Depending on your pipeline, you may need to add an actual vector as input, if say you were just serializing the random forest portion and do the feature assembly before hitting the API server.

colingoldberg commented 6 years ago

Ok, some details of my experience may help.

The input data file (weather.csv) has the following contents: "outlook","temperature","humidity","windy","play" "sunny",85,85,"FALSE","no" "sunny",80,90,"TRUE","no" "overcast",83,86,"FALSE","yes" "rainy",70,96,"FALSE","yes" "rainy",68,80,"FALSE","yes" "rainy",65,70,"TRUE","no" "overcast",64,65,"TRUE","yes" "sunny",72,95,"FALSE","no" "sunny",69,70,"FALSE","yes" "rainy",75,80,"FALSE","yes" "sunny",75,70,"TRUE","yes" "overcast",72,90,"TRUE","yes" "overcast",81,75,"FALSE","yes" "rainy",71,91,"TRUE","no"

Assuming I produced a decent model (bundle) - posted to the server:

docker ps

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES f6b0ace6d5ad combustml/mleap-serving:0.8.0 "bin/mleap-serving" 26 hours ago Up 26 hours 0.0.0.0:65327->65327/tcp vibrant_aryabhata [root@ip-10-0-2-37 19]# docker exec -it f6b0ace6d5ad bash root@f6b0ace6d5ad:/opt/docker# cd /models/ root@f6b0ace6d5ad:/models# ls -l total 4 -rwxr-xr-x 1 root root 1148 Nov 15 17:19 weather.custom.NaiveBayesClassification.spark.zip

My test request json file is (same data as first data line in the input file):

cat /tmp/frame.nb_example.json

{ "schema": { "fields": [{ "name": "outlook", "type": "string" }, { "name": "temperature", "type": "double" }, { "name": "humidity", "type": "double" }, { "name": "windy", "type": "string" }] }, "rows": [["sunny", 85.0, 85.0, "FALSE"]] }

Then I get an error posting the request:

curl -XPOST -H "accept: application/json" -H "content-type: application/json" -d @/tmp/frame.nb_example.json http://localhost:65327/transform

There was an internal server error.[root@ip-10-0-2-37 mleap]# [ERROR] [11/15/2017 21:41:06.009] [MleapServing-akka.actor.default-dispatcher-45] [MleapResource] error with request java.lang.IllegalArgumentException: invalid field: features at ml.combust.mleap.core.types.StructType.indexOf(StructType.scala:193) at ml.combust.mleap.core.types.StructType.indexedField(StructType.scala:203) at ml.combust.mleap.runtime.RowUtil$.createRowSelector(RowUtil.scala:43) at ml.combust.mleap.runtime.RowUtil$$anonfun$createRowSelectors$2.apply(RowUtil.scala:25) at ml.combust.mleap.runtime.RowUtil$$anonfun$createRowSelectors$2.apply(RowUtil.scala:23) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at ml.combust.mleap.runtime.RowUtil$.createRowSelectors(RowUtil.scala:23) at ml.combust.mleap.runtime.LeapFrame$class.withFields(LeapFrame.scala:84) at ml.combust.mleap.runtime.DefaultLeapFrame.withFields(DefaultLeapFrame.scala:10) at ml.combust.mleap.runtime.LeapFrame$class.withOutputs(LeapFrame.scala:135) at ml.combust.mleap.runtime.DefaultLeapFrame.withOutputs(DefaultLeapFrame.scala:10) at ml.combust.mleap.runtime.transformer.MultiTransformer$class.transform(Transformer.scala:106) at ml.combust.mleap.runtime.transformer.classification.NaiveBayesClassifier.transform(NaiveBayesClassifier.scala:17) at ml.combust.mleap.serving.MleapService$$anonfun$transform$1.apply(MleapService.scala:42) at ml.combust.mleap.serving.MleapService$$anonfun$transform$1.apply(MleapService.scala:42) at scala.Option.map(Option.scala:146) at ml.combust.mleap.serving.MleapService.transform(MleapService.scala:41) at ml.combust.mleap.serving.MleapResource$$anonfun$1$$anonfun$apply$1$$anonfun$apply$5$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16$$anonfun$apply$17.apply(MleapResource.scala:52) at ml.combust.mleap.serving.MleapResource$$anonfun$1$$anonfun$apply$1$$anonfun$apply$5$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16$$anonfun$apply$17.apply(MleapResource.scala:52) at akka.http.scaladsl.server.directives.RouteDirectives$$anonfun$complete$1.apply(RouteDirectives.scala:47) at akka.http.scaladsl.server.directives.RouteDirectives$$anonfun$complete$1.apply(RouteDirectives.scala:47) at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19) at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37) at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37) at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37) at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.transformWith$extension0(FastFuture.scala:37) at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:37) at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:35) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:47) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1$$anonfun$apply$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41) at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45) at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44) at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32) at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:81) at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:80) at akka.stream.impl.fusing.MapAsync$$anon$24.onPush(Ops.scala:1169) at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471) at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618) at akka.actor.Actor$class.aroundReceive(Actor.scala:496) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [11/15/2017 21:41:06.009] [MleapServing-akka.actor.default-dispatcher-45] [akka.actor.ActorSystemImpl(MleapServing)] Error during processing of request: 'invalid field: features'. Completing with 500 Internal Server Error response.

Where do I go from here?

Colin Goldberg

hollinwilkins commented 6 years ago

@colingoldberg Are you only outputting your naive bayes classifier to the bundle? You would need to output the entire pipeline to the bundle for this to work, including the feature assemblers and any other feature extraction you are performing to produce the features field.

colingoldberg commented 6 years ago

I did see a few issues - now adjusted. This time I get an internal server error "invalid field: play". "play" is the target/class column, not included in my request.

The pyspark script is as follows: ` from future import print_function from mleap import pyspark from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.ml.classification import NaiveBayes from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.sql import SparkSession from pyspark.ml import Pipeline

if name == "main": spark = SparkSession\ .builder\ .appName("NaiveBayesExample")\ .getOrCreate()

data = spark.read.format(data_file_format).option("header", "true").load(<data_file_path>)

categorical_columns = ['outlook', 'windy', 'play']
feature_columns = ['ohe_outlook', 'temperature', 'humidity', 'ohe_windy']
si_stages = [StringIndexer(inputCol=c, outputCol='si_' + c) for c in categorical_columns]
ohe_stages = [OneHotEncoder(inputCol='si_' + c, outputCol='ohe_' + c) for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')
all_stages = si_stages + ohe_stages + [vectorassembler_stage]
featurePipeline = Pipeline(stages=all_stages)
sparkFeaturePipelineModel = featurePipeline.fit(data)

nb = NaiveBayes(featuresCol="features", labelCol="si_play")
pipeline = [sparkFeaturePipelineModel] + [nb]
sparkPipelineEstimator = Pipeline(stages=pipeline)
sparkPipeline = sparkPipelineEstimator.fit(data)

sparkPipeline.serializeToBundle("jar:file:/tmp/models/weather.custom.NaiveBayesClassification.spark.zip", sparkPipeline.transform(data))

`

Next step: ` curl -v -XPUT -H "content-type: application/json" -d '{"path":"/models/weather.custom.NaiveBayesClassification.spark.zip"}' http://localhost:65327/model

Now when posting a request, I get an error "invalid field: play". But 'play' (a categorical column, string-indexed to 'si_play') is the target/class column, not included in my request. curl -XPOST -H "accept: application/json" -H "content-type: application/json" -d @/tmp/frame.nb_example.json http://localhost:65327/transform [ERROR] [11/20/2017 20:43:41.510] [MleapServing-akka.actor.default-dispatcher-16] [MleapResource] error with request java.lang.IllegalArgumentException: invalid field: play at ml.combust.mleap.core.types.StructType.indexOf(StructType.scala:193) at ml.combust.mleap.core.types.StructType.indexedField(StructType.scala:203) at ml.combust.mleap.runtime.RowUtil$.createRowSelector(RowUtil.scala:43) ... [ERROR] [11/20/2017 20:43:41.513] [MleapServing-akka.actor.default-dispatcher-16] [akka.actor.ActorSystemImpl(MleapServing)] Error during processing of request: 'invalid field: play'. Completing with 500 Internal Server Error response. There was an internal server error.

The request json file has the contents: { "schema": { "fields": [{ "name": "outlook", "type": "string" }, { "name": "temperature", "type": "double" }, { "name": "humidity", "type": "double" }, { "name": "windy", "type": "string" }] }, "rows": [["sunny", 85.0, 85.0, "FALSE"]] } I have attached the saved bundle weather.custom.NaiveBayesClassification.spark.zip

Any help is appreciated.

Thanks

Colin Goldberg weather.custom.NaiveBayesClassification.spark.zip

hollinwilkins commented 6 years ago

@colingoldberg If you are string indexing your prediction column, then you can't include it in your output scoring pipeline. I usually include a reverse string indexer, known as an IndexToString in spark in my scoring pipeline.

colingoldberg commented 6 years ago

I'm not sure if this would be the way to do it - I just added a statement to select all columns but the original "play" prediction column.

ie. sparkPipeline = sparkPipelineEstimator.fit(data) bundleDf = sparkPipeline.transform(data) bdf = bundleDf.select("outlook", "si_outlook", "ohe_outlook", "temperature", "humidity", "windy", "si_windy", "ohe_windy", "si_play", "features", "rawPrediction", "probability", "prediction") sparkPipeline.serializeToBundle("jar:file:/tmp/models/weather.custom.NaiveBayesClassification.spark.zip", bdf) The curl PUT produces "There was an internal server error." - no other error details. (It's not very friendly).

A look at the zip bundle indicates that there is no model.json or node.json at root level - my guess is this causes the internal error. But why is this happening?

less /tmp/models/weather.custom.NaiveBayesClassification.spark.zip Archive: /tmp/models/weather.custom.NaiveBayesClassification.spark.zip Zip file size: 8310 bytes, number of entries: 24 -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/ -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/ -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_45caa6fd46370efd53de.node/ -rw---- 2.0 fat 197 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_45caa6fd46370efd53de.node/model.json -rw---- 2.0 fat 216 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_45caa6fd46370efd53de.node/node.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_431a9f0dca28dff5849b.node/ -rw---- 2.0 fat 184 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_431a9f0dca28dff5849b.node/model.json -rw---- 2.0 fat 212 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_431a9f0dca28dff5849b.node/node.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_4e5db44e074ec39f31e9.node/ -rw---- 2.0 fat 180 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/StringIndexer_4e5db44e074ec39f31e9.node/model.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_496ba997242ea2407fa8.node/ -rw---- 2.0 fat 136 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_496ba997242ea2407fa8.node/model.json -rw---- 2.0 fat 220 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_496ba997242ea2407fa8.node/node.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_4a67a7259bcdae365bed.node/ -rw---- 2.0 fat 136 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_4a67a7259bcdae365bed.node/model.json -rw---- 2.0 fat 216 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_4a67a7259bcdae365bed.node/node.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_413cac225deb1c27f250.node/ -rw---- 2.0 fat 136 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/OneHotEncoder_413cac225deb1c27f250.node/model.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/VectorAssembler_481e830ce77ec6d4ee27.node/ -rw---- 2.0 fat 624 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/VectorAssembler_481e830ce77ec6d4ee27.node/model.json -rw---- 2.0 fat 399 bx defN 17-Nov-21 16:43 root/PipelineModel_401993ea1987de6badb9.node/VectorAssembler_481e830ce77ec6d4ee27.node/node.json -rw---- 1.0 fat 0 bx stor 17-Nov-21 16:43 root/NaiveBayes_43609abc81571ba8c9d2.node/ -rw---- 2.0 fat 826 bx defN 17-Nov-21 16:43 root/NaiveBayes_43609abc81571ba8c9d2.node/model.json -rw---- 2.0 fat 358 bx defN 17-Nov-21 16:43 root/NaiveBayes_43609abc81571ba8c9d2.node/node.json 24 files, 4040 bytes uncompressed, 1978 bytes compressed: 51.0% /tmp/models/weather.custom.NaiveBayesClassification.spark.zip (END)

At this point, I cannot send in a request as before.

A step-by-step tutorial would be helpful here - and save a lot of unnecessary time.

Thanks in anticipation.

Colin Goldberg

hollinwilkins commented 6 years ago

@colingoldberg You cannot export the string indexer of the column you are trying to predict, this column won't exist in your input data when scoring in realtime, hence why you were seeing the missing field error for play. You need to make sure not to include it in the pipeline you are exporting to the MLeap bundle.

Also, don't add that select. The data frame is meant as a way to extract metadata for serializing your models.

colingoldberg commented 6 years ago

I am still confused. Some of this confusion - but not all - is because I am new to Spark.

Let's take one step at a time if you will.

I added the select in order to remove the "play" (and now also "si_play") fields from the dataframe to be saved in the bundle. If this is not the correct way to do this, I would appreciate it f you would show me the steps I need once the pipeline has been created that includes all the stages for stringindexing, onehotencoding, vectorassembling, plus naive bayes, and before the call to serializeToBundle.

ie. ` sparkPipeline = sparkPipelineEstimator.fit(data) # as above

I thank you for your quick responses and a quick resolution here. It would also be helpful to include a step by step tutorial in your documentation.

Thanks

Colin Goldberg

henokyen commented 6 years ago

I was trying to follow this demo https://github.com/combust/mleap/wiki/Serializing-a-Spark-ML-Pipeline-and-Scoring-with-MLeap but I run into this error Failure(java.lang.IllegalArgumentException: struct already has field: x{})

which is caused by this peice of code: val s = scala.io.Source.fromURL("file:////Users/henok.s.mengistu/Documents/projects/Synchrony/data/mnist.json").mkString val bytes = s.getBytes("UTF-8") val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)

martin-weber commented 5 years ago

@colingoldberg If you are string indexing your prediction column, then you can't include it in your output scoring pipeline. I usually include a reverse string indexer, known as an IndexToString in spark in my scoring pipeline.

@hollinwilkins : Thanks for the hints. I ran into the same issue with the target variable. So I added the IndexToString Transformer in Spark, but the "invalid field" error remained. Then I read your following answer which might be a logical conclusion for this problem. The only thing is, I'm not sure how to remove the string indexer. Do you have a sample? Do I have to remove in spark when building the pipeline? Just before saving the pipeline in spark? Or is it possible to remove this stage from the pipeline just after deserializing the bundle? What do you suggest?

@colingoldberg You cannot export the string indexer of the column you are trying to predict, this column won't exist in your input data when scoring in realtime, hence why you were seeing the missing field error for play. You need to make sure not to include it in the pipeline you are exporting to the MLeap bundle.

Also, don't add that select. The data frame is meant as a way to extract metadata for serializing your models.

yairdata commented 5 years ago

@hollinwilkins - is there an option to supply only part of the fields in the schema, and the rest will be filled with default values? something like deserialization of json input to POJO with @JsonIgnoreProperties(ignoreUnknown = true) in it.

ancasarb commented 3 years ago

Closing this issue as an effort to clean up some older issues, please re-open if there are still unanswered questions, or perhaps create new more specific questions, thank you!