titicaca / spark-iforest

Isolation Forest on Spark
Apache License 2.0
227 stars 89 forks source link

spark-iforest for Streaming data ? #18

Closed bhushanbalki closed 5 years ago

bhushanbalki commented 5 years ago

Hi,

We tried for Bulk data and spark-iforest seems to be working fine but we would like to make it work for streaming data. Let us know how we can get it work.

Thanks, Bhushan

titicaca commented 5 years ago

I'm not quite familiar with spark streaming. But I think you can refer to StreamingKMeans, if you want to apply it in sparkstreaming.

https://github.com/apache/spark/blob/branch-2.4/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala

spicoflorin commented 5 years ago

Hi! I have used this algorithm in a streaming mode fashion but it doesn't work properly. There are some internals (threshold parameter) that will be computed for each batch (I'll show you bellow how I have implemented). With this incovenient all the values will be predicted as normal (0).

Maybe @titicaca will give a help on how we can solve this better.

Step 1. Load an already trained model (trained with the existing algorithm in a batch fashion) val model = PipelineModel.load("model/mySavedModel")

Step 2. Use the already trained algorim in foreachBatch method of the stream

yourStreamDataSet.select("yourFeatureColumnThatYouHaveProvidedForTraining").writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
      val pdf = model.transform(batchDF).select("prediction", ""yourFeatureColumnThatYouHaveProvidedForTraining").selectExpr("CAST(\"keyMessage\" AS STRING) AS key", "CAST(to_json(struct(*)) AS STRING) AS value").write
        .format("kafka")
        .option("kafka.bootstrap.servers", broker)
        .option("topic", "sparkIFPredict")
        .save()
    }) .start().awaitTermination()

Wrong prediction (all the values are normal) may be caused by the following code in IForest.scala Any help will be highly appreciated.

override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema, logging = true)
    val numSamples = dataset.count()
    val possibleMaxSamples =
      if ($(maxSamples) > 1.0) $(maxSamples) else ($(maxSamples) * numSamples)
    val bcastModel = dataset.sparkSession.sparkContext.broadcast(this)
    // calculate anomaly score
    val scoreUDF = udf { (features: Vector) =>
      {
        val normFactor = avgLength(possibleMaxSamples)
        val avgPathLength = bcastModel.value.calAvgPathLength(features)
        Math.pow(2, -avgPathLength / normFactor)
      }
    }
    // append a score column
    val scoreDataset = dataset.withColumn($(anomalyScoreCol), scoreUDF(col($(featuresCol))))

    if (threshold < 0) {
      logger.info("threshold is not set, calculating the anomaly threshold according to param contamination..")
      threshold = scoreDataset.stat.approxQuantile($(anomalyScoreCol),
        Array(1 - $(contamination)), $(approxQuantileRelativeError))(0)

    }

    // set anomaly instance label 1
    val predictUDF = udf { (anomalyScore: Double) =>
      if (anomalyScore > threshold) 1.0 else 0.0
    }
    scoreDataset.withColumn($(predictionCol), predictUDF(col($(anomalyScoreCol))))
  }
titicaca commented 5 years ago

You just need to set the threshold value before predicting (transforming) data.

You can reference the following codes in the transform method:

 if (threshold < 0) {
      logger.info("threshold is not set, calculating the anomaly threshold according to param contamination..")
      threshold = scoreDataset.stat.approxQuantile($(anomalyScoreCol),
        Array(1 - $(contamination)), $(approxQuantileRelativeError))(0)
    }

If param threshold is not set, it will be calculated according the given batch data and param contamination every batch.

spicoflorin commented 5 years ago

Thanks for the response. Regarding this treshold, I would like to clarify the following:

  1. Since I have an already trained model, why this threshold is not saved into the model itself?
  2. I loaded the already trained model via val model = PipelineModel.load("model/mySavedModel")
    how can I set up the threshold value that is an attribute of IForestModel?
  3. In the batch mode (no streaming) the threshold is computed once. In the streaming mode this threshold will be computed on every batch, thus the prediction might be affected. The threshold for one batch might be different from the previous ones, thus the algorithm might predict different for the same feature value. For example for a value of temperature of 90 degrees on winter we might predict in the first phase that is abornmal, but in the next batch will be predicted as normal.
titicaca commented 5 years ago

There are two params for finding anomaly data. One is anomaly value threshold, the other one is contamination, which defines the percentage of the anomaly data in the given dataset. As default, anomaly value threshold is not set, and it uses param contamination for calculating the threshold for the dataset. It suits the case for unsupervised learning to find the anomaly data in the whole dataset for one batch time.

I think in your streaming case, you have to train your model in the training dataset in advance, and predict for other data in streaming. Thus you have to tune and get the best threshold value for the model during model training, and fix this threshold in your predicting phase in streaming.

There are two ways to set param threshold for your case. You can set the threshold before model saving in your training or param tuning phase. Or you you can reset the value after loading the pipeline model. Regarding to your 2nd question, this answer may help

spicoflorin commented 5 years ago

Many thanks for the explanations. You are right. I have used the batch algorithm to train the model and saved it. I've obtained the threshold. I didn't get how to save it in the model itselb. What I have managed it, it was to set ip up when running the predictiong pipeline:

val model = PipelineModel.load("myIsoModelPath")
 val isoModel = model.stages.last.asInstanceOf[IForestModel]
 isoModel.setThreshold(0.7379143084768235d)