huawei-noah / streamDM

Stream Data Mining Library for Spark Streaming
http://streamdm.noahlab.com.hk/
Apache License 2.0
492 stars 147 forks source link

Issue with Kafka Reader #95

Closed saraAlizadeh closed 6 years ago

saraAlizadeh commented 6 years ago

Hello, I have an issue with kafka, I ran a command which was reading the dataset using FileReader, and it was OK, then I tried to do the same using 'KafkaReader', but it fails. the log results are shown below: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/05/26 07:01:30 INFO SparkContext: Running Spark version 2.1.0 18/05/26 07:01:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/05/26 07:01:31 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.17.177 instead (on interface eno33557248) 18/05/26 07:01:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/05/26 07:01:31 INFO SecurityManager: Changing view acls to: root 18/05/26 07:01:31 INFO SecurityManager: Changing modify acls to: root 18/05/26 07:01:31 INFO SecurityManager: Changing view acls groups to: 18/05/26 07:01:31 INFO SecurityManager: Changing modify acls groups to: 18/05/26 07:01:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 18/05/26 07:01:31 INFO Utils: Successfully started service 'sparkDriver' on port 44017. 18/05/26 07:01:31 INFO SparkEnv: Registering MapOutputTracker 18/05/26 07:01:31 INFO SparkEnv: Registering BlockManagerMaster 18/05/26 07:01:31 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/05/26 07:01:31 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/05/26 07:01:31 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-0a7ed5a0-688d-4684-8962-9f9c398dc979 18/05/26 07:01:31 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 18/05/26 07:01:31 INFO SparkEnv: Registering OutputCommitCoordinator 18/05/26 07:01:32 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/05/26 07:01:32 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.17.177:4040 18/05/26 07:01:32 INFO SparkContext: Added JAR file:/root/Downloads/spark-streaming-kafka-0-10_2.10-2.1.0.jar at spark://192.168.17.177:44017/jars/spark-streaming-kafka-0-10_2.10-2.1.0.jar with timestamp 1527332492071 18/05/26 07:01:32 INFO SparkContext: Added JAR file:/root/streamDM/scripts/../target/scala-2.10/streamdm-spark-streaming-_2.10-0.2.jar at spark://192.168.17.177:44017/jars/streamdm-spark-streaming-_2.10-0.2.jar with timestamp 1527332492072 18/05/26 07:01:32 INFO Executor: Starting executor ID driver on host localhost 18/05/26 07:01:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45190. 18/05/26 07:01:32 INFO NettyBlockTransferService: Server created on 192.168.17.177:45190 18/05/26 07:01:32 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/05/26 07:01:32 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.17.177:45190 with 366.3 MB RAM, BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.17.177, 45190, None) Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at org.apache.spark.streamdm.streams.KafkaReader.getExamples(KafkaReader.scala:62) at org.apache.spark.streamdm.tasks.EvaluatePrequential.run(EvaluatePrequential.scala:71) at org.apache.spark.streamdm.streamDMJob$.main(streamDMJob.scala:56) at org.apache.spark.streamdm.streamDMJob.main(streamDMJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 13 more 18/05/26 07:01:32 INFO SparkContext: Invoking stop() from shutdown hook 18/05/26 07:01:32 INFO SparkUI: Stopped Spark web UI at http://192.168.17.177:4040 18/05/26 07:01:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/05/26 07:01:32 INFO MemoryStore: MemoryStore cleared 18/05/26 07:01:32 INFO BlockManager: BlockManager stopped 18/05/26 07:01:32 INFO BlockManagerMaster: BlockManagerMaster stopped 18/05/26 07:01:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/05/26 07:01:32 INFO SparkContext: Successfully stopped SparkContext 18/05/26 07:01:32 INFO ShutdownHookManager: Shutdown hook called 18/05/26 07:01:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-23e4273b-e33d-4dbf-ace4-72597addd9b6

Infrastructure details

also I imported org.apache.spark:spark-streaming-kafka-0-10_2.10:2.1.0 in my terminal using --packages and in my ~/.bashrc using an export. I didn't work and I changed my spark.sh, in scripts directory to: $SPARK_HOME/bin/spark-submit \ --jars /root/Downloads/spark-streaming-kafka-0-10_2.10-2.1.0.jar \ --class "org.apache.spark.streamdm.streamDMJob" \ --master local[2] \ ../target/scala-2.10/streamdm-spark-streaming-_2.10-0.2.jar \ $1 but the same error appears everytime. can any one help me on this?

hmgomes commented 6 years ago

Hi @saraAlizadeh

Can you provide more details about the specific command you executed and the kafka stream used? It can be a toy example to replicate the same error

Best Regards,

Heitor

saraAlizadeh commented 6 years ago

Hi @hmgomes This command runs fine: /spark.sh "200 EvaluatePrequential -l (meta.Bagging -l trees.HoeffdingTree) -s (FileReader -f covtypeNorm.arff -k 5810 -d 10 -i 581012) -e (BasicClassificationEvaluator -c -m) -h" 1> result_cov.txt 2> log_cov.log but this one creates the error: ./spark.sh "200 EvaluatePrequential -l (meta.Bagging -l trees.HoeffdingTree) -s (KafkaReader -b localhost:9092 -t streamDM-test) -e (BasicClassificationEvaluator -c -m) -h" 1> result_cov.txt 2> log_cov.log

I've already started zookeper,kafka server and created a topic(streamDM-test).

I can test kafka with spark 1.6.3, but streamDM is not compiled correctly with that version of spark. can you provide me a set of versions that are compatible together?

hmgomes commented 6 years ago

Hi @saraAlizadeh

You can try a version of streamDM before the upgrade to Spark 2.2, such as these:

commit 9ff5d0c8b9b22a13ba0d389a3cdf3a62c1e9e30f or this: commit 942e4000e7e657c96e6d0c51a04f5051038e7b40

These should be compatible with Spark 1.6 (I've just tried the latest one with Spark 1.6.1) Let me know if it works

Best Regards,

Heitor

saraAlizadeh commented 6 years ago

Hi @hmgomes Sorry for delay, I was really busy working around with this issue. I compiled commit 9ff5d0c, and thanks, it is OK. now streamDM gets records from kafka topic and creates result. but there is something weird about it. look at my result file: Accuracy,Recall,Precision,F(beta=1.0)-score,Specificity,TP,FN,FP,TN 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0 1.000,1.000,1.000,1.000,NaN,1,0,0,0

and this is a part of result file created by streamDM when reading data from file. Accuracy,Recall-avg-macro,Precision-avg-macro,F(beta=1.0)-score-avg-macro, 0.260,0.143,0.260,0.184, 0.156,0.279,0.130,0.177, 0.514,0.408,0.488,0.445, 0.715,0.294,0.266,0.280, 0.756,0.749,0.511,0.608, 0.731,0.439,0.476,0.457, 0.799,0.465,0.605,0.526, 0.862,0.777,0.916,0.841, 0.933,0.861,0.954,0.905, 0.856,0.768,0.914,0.835, 0.795,0.733,0.821,0.774, 0.892,0.810,0.930,0.865, 0.809,0.470,0.861,0.608, 0.815,0.582,0.700,0.636, 0.805,0.574,0.749,0.650, 0.822,0.540,0.588,0.563, 0.887,0.453,0.740,0.562, 0.867,0.522,0.883,0.656, 0.916,0.566,0.890,0.692, 0.908,0.557,0.818,0.663,

do they look alright to you?

hmgomes commented 6 years ago

Hi @saraAlizadeh

No issues, I am also very busy these days. Regarding your question: They do not look alright. It was expected that you obtained the same results, because essentially it is the same data, right? Can you verify if the output and input are properly parsed after streamDM gets the records? I.e. perhaps the class label is not properly set

Best Regards, Heitor

saraAlizadeh commented 6 years ago

well, yes, the results were supposed to be the same. considering that kafka might have changed the order of records, again it's insane to have all accuracies be 1. I haven't parsed recordes, just executed the code, which in it PrequentialEvalutor gets examples from kafka reader. but I'm really suspicious to this, what are the examples exactly? I couldn't debug it in intellij to see what are the exact values and their types, when I use kafka. I think maybe printing what kafka passes to evaluator, may help, and right now I'm coding it. thanks for your care. and could you reproduce the same result?

On Tue, 26 Jun 2018, 19:28 Heitor Murilo Gomes, notifications@github.com wrote:

Hi @saraAlizadeh https://github.com/saraAlizadeh

No issues, I am also very busy these days. Regarding your question: They do not look alright. It was expected that you obtained the same results, because essentially it is the same data, right? Can you verify if the output and input are properly parsed after streamDM gets the records? I.e. perhaps the class label is not properly set

Best Regards, Heitor

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/huawei-noah/streamDM/issues/95#issuecomment-400340737, or mute the thread https://github.com/notifications/unsubscribe-auth/APxKp6A37A0sYitqxwceG9oaQVOHw-xdks5uAkwQgaJpZM4UPJuO .

hmgomes commented 6 years ago

It is a good idea to print what kafka passes to the evaluator, by observing the class value we shall be able to investigate if features (input and output) were used correctly. Regarding reproducing the same result: I haven't tried it yet, but I will need to investigate KafkaReader either way in the near future, so I will try to arrange and double check it as soon as possible. Please also let me know if you find something and thanks for your help on this matter :)

saraAlizadeh commented 6 years ago

Hi, I googled and talked to some kafka experts, apparently kafka's output is String. I think that's why my results are so oddly wrong. Can we change the output type??

On Tue, 26 Jun 2018, 20:12 Heitor Murilo Gomes, notifications@github.com wrote:

It is a good idea to print what kafka passes to the evaluator, by observing the class value we shall be able to investigate if features (input and output) were used correctly. Regarding reproducing the same result: I haven't tried it yet, but I will need to investigate KafkaReader either way in the near future, so I will try to arrange and double check it as soon as possible. Please also let me know if you find something and thanks for your help on this matter :)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/huawei-noah/streamDM/issues/95#issuecomment-400357418, or mute the thread https://github.com/notifications/unsubscribe-auth/APxKpzqpDIbOhteiZuSqJzcnd1huBJ_Vks5uAlZRgaJpZM4UPJuO .

hmgomes commented 6 years ago

Hi @saraAlizadeh,

That may be the cause of the issue, perhaps an implicit conversion is taking place, but still it is strange that you get 1.0 accuracy. Did you had the chance to observe the values kafka passes to the evaluator? You can change function confusion() from subclass ConfusionMatrix, something like this:

def confusion(x: (Example,Double)):
  Map[String, Double] = {
    val tp = if ((x._1.labelAt(0)==x._2)&&(x._2==0.0)) 1.0 else 0.0
    val fn = if ((x._1.labelAt(0)!=x._2)&&(x._2==0.0)) 1.0 else 0.0
    val fp = if ((x._1.labelAt(0)!=x._2)&&(x._2==1.0)) 1.0 else 0.0
    val tn = if ((x._1.labelAt(0)==x._2)&&(x._2==1.0)) 1.0 else 0.0

    logInfo("def confusion(...) label: %f and prediction: %f".format(x._1.labelAt(0), x._2))

    Map("tp" -> tp, "fn" -> fn, "fp" -> fp, "tn" -> tn)
  }

This will show (see log) the actual labels and predictions, even though predictions might not be as interesting at this point.

Best Regards,

Heitor

saraAlizadeh commented 6 years ago

Hi, Sorry for delay. I solved this issue. it was kinda lack of coding instead of bad coding! I used logInfo to print out some values. I found out these facts:

  1. all labels are read as '0'
  2. all predicted labels are '0'
  3. number of classes is the default value, not the value related to dataset.
  4. number of features is '0'
  5. none of readers work properly. except FileReader. and also it just reads .arff files fine, not csv files.
  6. I came to this: .arff files have a description. a function, fromArff, reads it and then makes 'example specification'. which is the input of other important methods like learning and prediction. So except when your file is .arff, nothing will go right.

So this is what i did for kafka: I made a .arff file with only description part of my dataset and no data. I called fromArff in my version ofkafkaReader to make exapmleSpec. after that I had a correct spec of my data I started kafka and read lines of dataset.

there were plenty of details but I think the whole story is clear now. Hope this would be helpful for anyone and thank you @hmgomes to guide me. You were really helpful.

hmgomes commented 6 years ago

Thanks @saraAlizadeh for your detailed analysis and work around this problem. As soon as I finish what I am working on, I will update those readers and test then individually. Once again, thanks for taking the time to look into this and I hope in the future it becomes easier to use kafka reader

Best wishes,

Heitor