ibm-watson-data-lab / spark.samples

tutorials and samples that show you how get the most out of IBM Analytics for Apache Spark
https://developer.ibm.com/clouddataservices/docs/spark/
Apache License 2.0
79 stars 67 forks source link

Using the new 1.5 jar no tweets come in #2

Closed cattoire closed 8 years ago

cattoire commented 8 years ago

Using the new 1.5 jar I never succeed in collecting any tweets. I am quite sure about the credentials as when I use the same ones with the 1.2 jar I get tweets being collected. Off course then the tone analysis doesn't work as it is still trying to call the api in the old way.

I have been looking into the code but don't seem to find the actual issue.

DTAIEB commented 8 years ago

There were some changes in Tone analyzer api and Event hub (now Message Connect) that broke this app. I'm in the process of updating this app and the associated tutorial to fix it. Stay Tuned!

NicPDev commented 8 years ago

Dear DTAIEB, are you still in the process of updating -or in other words - should it work as intended? I keep getting Name: org.apache.kafka.common.config.ConfigException Message: Invalid value com.ibm.cds.spark.samples.StatusSerializer for configuration value.serializer: Class com.ibm.cds.spark.samples.StatusSerializer could not be found. StackTrace: org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:204) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:122) org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)

and Receiver Started: KafkaReceiver-0 Receiver Stopped: KafkaReceiver-0 Reason: org.apache.kafka.common.config.ConfigException: Invalid value com.ibm.cds.spark.samples.StatusDeserializer for configuration value.deserializer: Class com.ibm.cds.spark.samples.StatusDeserializer could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:204) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:122) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)

Right after trying to start the streamingservice. I also have been looking into the code for quite some time now and also don't seem to find the issue.

DTAIEB commented 8 years ago

Hi @NiPfeuffer this is a known issue that is due to a recent change in the service. A fix will be provided very soon. Please check back on the tutorial often to check for updates.' Thanks

NicPDev commented 8 years ago

Hey @DTAIEB , seems like the streaming part is working now, thanks for that. Still the Tweets are not being processed properly, as it seems.

At least in the tutorial which does not use Kafka but directly creates an instance of your StreamingTwitter class. I get the message from the Streaminglistener, that "Batch completed with 230 Records" but in the end I could not collect any Tweets.

The StreamingService in the Tutorial with Kafka also seems to work, but the Listener returns " Batch completed with 0 records".

I guess the issue lies within Line 171 to Line 203, as this is the part which should set the workingRDD.

workingRDD = sc.parallelize( rdd.map( t => t._1 ).collect()).union( workingRDD ) val saveToCloudant = broadcastVar.value.get("cloudant.save").get.toBoolean if ( saveToCloudant ){ rdd.foreachPartition { iterator => var db: CouchDbApi = null; val couch = CouchDb( broadcastVar.value.get("cloudant.hostName").get, broadcastVar.value.get("cloudant.port").get.toInt, broadcastVar.value.get("cloudant.https").get.toBoolean, broadcastVar.value.get("cloudant.username").get, broadcastVar.value.get("cloudant.password").get ); val dbName = "spark-streaming-twitter" couch.dbs.get(dbName).attemptRun match{ case -\/(e) => logger.trace("Couch Database does not exist, creating it now"); couch.dbs.create(dbName).run case \/-(a) => println("Connected to cloudant db " + dbName ) } val typeMapping = TypeMapping(classOf[ToneAnalyzer.Tweet] -> "Tweet") db = couch.db(dbName, typeMapping) iterator.foreach( t => { saveTweetToCloudant( client, db, t._2._2, t._2._1 ) } ) } } } }catch{ case e: InterruptedException=>//Ignore case e: Exception => logError(e.getMessage, e ) }finally{ canStopTwitterStream = true } }) Could you please check this issue? Thanks a lot for your help

DTAIEB commented 8 years ago

I believe this issue has been resolved with version 1.6. Closing for now, but feel free to re-open if issues persist