Closed exalate-issue-sync[bot] closed 1 year ago
JIRA Issue Migration Info
Jira Issue: SW-2758 Assignee: Marek Novotny Reporter: Eric Gudgion State: Resolved Fix Version: 3.38.0.3-1 Attachments: N/A Development PRs: Available
Linked PRs from JIRA
JIRA Issue Migration Info Cont'd
Jira Issue Created Date: 2022-11-10T06:47:35.283-0800
Acxiom are a prospect using Driverless AI but need to deploy the java mojo to score on Spark (they have a test using Sparkling Water too) the scoring cluster is 640 nodes and they have 4,000 models to score in batches to 100 models at a time.
The per row scoring latency is important to them in Spark as they use AMR and need to be very careful on the execution costs.
They ran the run_example.sh from the mojo.zip and get ~1.3ms per row but the mojo in Spark has a 18.4ms per row.
java -Xmx5g -Dsys.ai.h2o.mojos.parser.csv.separator='|' -Dsys.ai.h2o.mojos.exposedInputs=input.mriid,X611953 -Dai.h2o.mojos.runtime.license.file=license.sig -cp mojo2-runtime.jar ai.h2o.mojos.ExecuteMojo x611953.mojo dormant_done.csv >x611953.java.out
Mojo load time: 0 usec, Mojo scoring/output time: 1 min 41.607 sec, Per row time: 1.324 msec, Number of rows: 76720, Total time: 1 min 41.607 sec
/opt/ssbe/ssbe-spark/bin/spark-shell --master 'local[1]' --driver-memory 8g --executor-memory 8g --jars license.sig,sparkling-water-3.38.0.1-1-3.2/jars/sparkling-water-assembly-scoring_2.12-3.38.0.1-1-3.2-all.jar
Sparkling: Records = 76720, Time = 1411794, ms per record = 18.401903023983316
import ai.h2o.sparkling.ml.models.{H2OMOJOPipelineModel, H2OMOJOSettings}
val inputFilename = "file:///mnt/ifs80001/ubx/sfurss/temp/models/dormant_done.csv"
val mojoFilename = "file:///mnt/ifs80001/ubx/sfurss/temp/models/x611953.mojo"
val input = spark.read.option("header",true).option("sep","|").csv(inputFilename)
val settings = H2OMOJOSettings(namedMojoOutputColumns = false)
val mojo = H2OMOJOPipelineModel.createFromMojo(mojoFilename, settings)
val requiredFields = "input.mriid" +: mojo.getFeaturesCols().asInstanceOf[Array[String]]
val limitedCols = input.select( requiredFields.map(c => col("
"+c+"
")):_*)limitedCols.printSchema()
val output = mojo.transform(limitedCols).withColumn("probability", format_number(col("prediction.preds").getItem(1),16)).drop("prediction")
val start = System.currentTimeMillis()
val count = spark.sparkContext.longAccumulator("CountAccumulator")
output.foreach(it => {
count.add(1)
});
val time = System.currentTimeMillis() - start
println("Records = "+count.value+", Time = "+time+", ms per record = "+(time.asInstanceOf[Double]/count.value))
The test model / zip is available here: [https://h2o-se-fileshare.s3.amazonaws.com/EricG/acxiom/mojo_x611953.zip|https://h2o-se-fileshare.s3.amazonaws.com/EricG/acxiom/mojo_x611953.zip]