huawei-noah / streamDM

Stream Data Mining Library for Spark Streaming
http://streamdm.noahlab.com.hk/
Apache License 2.0
492 stars 147 forks source link

Bug Report in TreeCoreset-StreamKM-BucketManager #100

Open ioanna-ki opened 6 years ago

ioanna-ki commented 6 years ago

Bug Report TreeCoreset-StreamKM-BucketManager

Expected behavior

A TreeCoreset contains the methods to extract an m-sized array of Examples from an n-sized stream of Examples where n>m.

Observed behavior

The extracted coreset contains many duplicate Examples with weight = 0.0

I used a custom file as input since some of the readers have issues. I attached the file below.

Command line

./spark.sh "ClusteringTrainEvaluate -c (StreamKM -s 100 -w 1000) -s (SocketTextStreamReader)" 1> result.txt 2> log_streamKM.log

To reproduce the issue several changes needs to be done.

  1. In bucketManager method getCoreset needs to change, because break obviously does not work so I used an "isFound" flag instead (any other suggestions instead of a flag are welcomed) def getCoreset: Array[Example] = { if(buckets(L-1).isFull) { buckets(L-1).points.toArray }else { var i = 0 var coreset = Array[Example]() for(i <- 0 until L) { if(buckets(i).isFull) { coreset = buckets(i).points.toArray break } } val start = i+1 for(j <- start until L) { val examples = buckets(j).points.toArray ++ coreset val tree = new TreeCoreset coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize), new Array[Example](0)) } coreset } } }

  2. You have to print the output of the coreset in StreamKM using Example.toString and Example.weight to see that some Examples are changing their weight values to 0.0 even if 1.0 is the default value. (Thats probably happening in line 187 of TreeCoreset.scala where the leaf node has no elements so e.n is zero)

  3. I did a pull request in StreamKM but the issues are still there. Since method getCoreset is called every time a new Example input comes (a really slow process.) I checked moa's version in java (line 128) and there is a comment about calling getCoreset after the streaming process is finished. So in spark I'am guessing we should call it when numInstances are equals to rdd.count divided with the number of workers or the repartition of the input.

  4. in TreeCoreset.scala line 86 funCost has sometimes zero value so when costOfPoint is divided by funcost sum is NaN

    Any thoughts on how to solve the problem with the duplicates? Thanks in advance.

txt file with 1000 randomly chosen points random2.txt

Infrastructure details

Java Version: 1.8.0_144
Scala Version: 2.11.6
Spark version: 2.2.0
OS version: ubuntu 16.4
Spark Standalone cluster mode