titicaca / spark-iforest

Isolation Forest on Spark
Apache License 2.0
226 stars 90 forks source link

org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.ml.linalg.DenseVector??? #24

Closed ZhouJiaLinmumu closed 4 years ago

ZhouJiaLinmumu commented 4 years ago

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.ml.iforest.IForest$$anonfun$fit$1.apply(IForest.scala:539) at org.apache.spark.ml.iforest.IForest$$anonfun$fit$1.apply(IForest.scala:495) at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183) at org.apache.spark.ml.iforest.IForest.fit(IForest.scala:495) at org.apache.spark.ml.iforest.IForest.fit(IForest.scala:334) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149) at org.apache.spark.examples.ml.IForestExample$.main(IForestExample.scala:58) at org.apache.spark.examples.ml.IForestExample.main(IForestExample.scala) Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.ml.linalg.DenseVector

titicaca commented 4 years ago

Check your spark version. The latest codes supports only for Spark 2.4.x

mkaranasou commented 4 years ago

Hi @titicaca, Thanks for providing us with the python version of Isolation Forest.

I'm using Pyspark version 2.4.x ( tried with 2.4.0, 2.4.4) and have the same issue:

ava.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.ml.linalg.DenseVector
    at org.apache.spark.ml.iforest.IForest$$anonfun$sampleFeatures$1.apply(IForest.scala:575)
    at org.apache.spark.ml.iforest.IForest$$anonfun$sampleFeatures$1.apply(IForest.scala:575)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.ml.iforest.IForest.sampleFeatures(IForest.scala:575)
    at org.apache.spark.ml.iforest.IForest$$anonfun$fit$1$$anonfun$24.apply(IForest.scala:520)
    at org.apache.spark.ml.iforest.IForest$$anonfun$fit$1$$anonfun$24.apply(IForest.scala:514)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2020-02-28 15:56:36 ERROR TaskSetManager:70 - Task 0 in stage 5.0 failed 1 times; aborting job
2020-02-28 15:56:36 ERROR Instrumentation:70 - org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
scala.Option.foreach(Option.scala:257)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.collect(RDD.scala:944)
org.apache.spark.ml.iforest.IForest$$anonfun$fit$1.apply(IForest.scala:539)
org.apache.spark.ml.iforest.IForest$$anonfun$fit$1.apply(IForest.scala:495)
org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
scala.util.Try$.apply(Try.scala:192)
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
org.apache.spark.ml.iforest.IForest.fit(IForest.scala:495)
org.apache.spark.ml.iforest.IForest.fit(IForest.scala:334)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:745)

I am doing a very basic example:

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import IForest, IForestModel
import tempfile

conf = SparkConf()
conf.set('spark.jars', '/full/path/to/spark-iforest-2.4.0.jar')

spark = SparkSession \
        .builder \
        .config(conf=conf) \
        .appName("IForestExample") \
        .getOrCreate()

temp_path = tempfile.mkdtemp()
iforest_path = temp_path + "/iforest"
model_path = temp_path + "/iforest_model"

data = [
    {'feature1': 1., 'feature2': 0., 'feature3': 0.3, 'feature4': 0.01},
    {'feature1': 10., 'feature2': 3., 'feature3': 0.9, 'feature4': 0.1},
    {'feature1': 101., 'feature2': 13., 'feature3': 0.9, 'feature4': 0.91},
    {'feature1': 111., 'feature2': 11., 'feature3': 1.2, 'feature4': 1.91},
    {'feature1': 0., 'feature2': 0., 'feature3': 0., 'feature4': 0.1},  #  issue happens when I add this line
]

# use a VectorAssembler to gather the features as Vectors (dense)
assembler = VectorAssembler(
    inputCols=list(data[0].keys()),
    outputCol="features"
)

df = spark.createDataFrame(data)
df.printSchema()
df = assembler.transform(df)
df.show()

# last line, features column: a sparse vector
# +--------+--------+--------+--------+--------------------+
# |feature1|feature2|feature3|feature4|            features|
# +--------+--------+--------+--------+--------------------+
# |     1.0|     0.0|     0.3|    0.01|  [1.0,0.0,0.3,0.01]|
# |    10.0|     3.0|     0.9|     0.1|  [10.0,3.0,0.9,0.1]|
# |   101.0|    13.0|     0.9|    0.91|[101.0,13.0,0.9,0...|
# |   111.0|    11.0|     1.2|    1.91|[111.0,11.0,1.2,1...|
# |     0.0|     0.0|     0.0|     0.1|       (4,[3],[0.1])|
# +--------+--------+--------+--------+--------------------+

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
iforest = IForest(contamination=0.3, maxDepth=2)
iforest.setSeed(42)  # for reproducibility

scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
df = df.withColumn('features', F.col('scaledFeatures')).drop('scaledFeatures')
model = iforest.fit(df)
mkaranasou commented 4 years ago

(I worked around it with a udf transforming to dense vectors from a list of feature values: )

list_to_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())
df = df.withColumn(
    'vectorized_features', F.array(*list(data[0].keys()))
)
df = df.withColumn(
    'vectorized_features',
    list_to_vector_udf('vectorized_features')
)

# scale and transform as usual.. 
titicaca commented 4 years ago

Good! Yes, features need to be converted to dense vector for the model input. I will add this note into the guide later.