linkedin / isolation-forest

A Spark/Scala implementation of the isolation forest unsupervised outlier detection algorithm with support for exporting in ONNX format.
Other
223 stars 47 forks source link

Issue saving the model #28

Closed sridhar closed 2 years ago

sridhar commented 3 years ago
  1. I've built the library using: ./gradlew clean build -x test -PsparkVersion=2.4.3 -PscalaVersion=2.11.12
  2. I have spark instantiated via spark-2.4.3-hadoop2.6/sbin/start-all.sh
  3. I have the following code that I'm building using sbt (example adapted from README)
import com.linkedin.relevance.isolationforest._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ SparkSession, DataFrame, Row }

object Main {
    def main(args: Array[String]): Unit = {
        //val conf = new SparkConf().setAppName("IsolShap").setMaster("spark://localhost:4040")
  val spark = SparkSession.builder
      .master("spark://ai-monster:7077")
      .appName("Isol")
      .config("spark.jars","lib/isolation-forest_2.4.3_2.11-2.0.6.jar")
      .getOrCreate()

  val rawData = spark.read
    .format("csv")
    .option("comment", "#")
    .option("header", "false")
    .option("inferSchema", "true")
    .load("resources/shuttle.csv")

  val cols = rawData.columns
  val labelCol = cols.last

  val assembler = new VectorAssembler()
    .setInputCols(cols.slice(0, cols.length - 1))
    .setOutputCol("features")
  val data = assembler
    .transform(rawData)
    .select(col("features"), col(labelCol).as("label"))

  val contamination = 0.1
  val isolationForest = new IsolationForest()
    .setNumEstimators(100)
    .setBootstrap(false)
    .setMaxSamples(256)
    .setMaxFeatures(1.0)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)

val isolationForestModel = isolationForest.fit(data)

  /**
    * Score the training data
    */

  val dataWithScores = isolationForestModel.transform(data)

  isolationForestModel.save("/tmp/mymodel") <==== ERROR here

  dataWithScores.take(5).foreach(println)

  dataWithScores.printSchema()

The above run fails on model save stage with the following error:

21/06/03 18:57:27 INFO IsolationForestModelReadWrite$IsolationForestModelWriter: Saving IsolationForestModel tree data to path /tmp/mymodel/data
[error] (run-main-0) java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
[error] java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
[error]         at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
[error]         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
[error]         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImplHelper(IsolationForestModelReadWrite.scala:263)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImpl(IsolationForestModelReadWrite.scala:242)
[error]         at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
[error]         at org.apache.spark.ml.util.MLWritable$class.save(ReadWrite.scala:306)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModel.save(IsolationForestModel.scala:20)
[error]         at Main$.main(Main.scala:63)
[error]         at Main.main(Main.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.AvroFileFormat.DefaultSource
[error]         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] stack trace is suppressed; run last Compile / bgRun for the full output
21/06/03 18:57:27 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext

Now since spark-avro has already been compiled in the linkedin jar, I shouldn't have to include it again. But I added I tried multiple things;

  1. Converted the gradle dependencies from compile to implementation, but that didn't help.
  2. Added spark-avro to the dependency list of sbt while compiling the above code. Then I get:
[error] (run-main-0) org.apache.spark.SparkException: Job aborted.
[error] org.apache.spark.SparkException: Job aborted.
[error]         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
[error]         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
[error]         at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
[error]         at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
[error]         at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
[error]         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
[error]         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
[error]         at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
[error]         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error]         at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
[error]         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
[error]         at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
[error]         at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
[error]         at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
[error]         at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
[error]         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
[error]         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
[error]         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
[error]         at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
[error]         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
[error]         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
[error]         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImplHelper(IsolationForestModelReadWrite.scala:263)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImpl(IsolationForestModelReadWrite.scala:242)
[error]         at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
[error]         at org.apache.spark.ml.util.MLWritable$class.save(ReadWrite.scala:306)
[error]         at com.linkedin.relevance.isolationforest.IsolationForestModel.save(IsolationForestModel.scala:20)
[error]         at Main$.main(Main.scala:63)
[error]         at Main.main(Main.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 187, 10.40.14.5, executor 0): java.lang.ClassNotFoundExcep$
ion: org.apache.spark.sql.avro.AvroOutputWriterFactory
[error]         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error]         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[error]         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[error]         at java.lang.Class.forName0(Native Method)
[error]         at java.lang.Class.forName(Class.java:348)
[error]         at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
[error]         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
[error]         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
[error]         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
[error]         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
[error]         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
[error]         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
[error]         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
[error]         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
[error]         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
...

I'm using the Top of the tree source. Am I missing something here? Do I need to do something specific here? Is there a bug in the packaging script of the library?

Note that the above code works without the save invocation.

sridhar commented 3 years ago

The issue goes away when I manually copy the spark-avro jar to the executors. This should have been automatic if I'm doing the same for the main tar: .config("spark.jars","lib/isolation-forest_2.4.3_2.11-2.0.6.jar,lib/spark-avro_2.11-2.4.3.jar")

jverbus commented 3 years ago

The spark-avro module is external and isn't included by default.

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

As you point out, you can add the jar manually or specify the coordinates.

spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.3,com.linkedin.isolation-forest:isolation-forest_2.4.3_2.11:2.0.4