Closed fzschornack closed 9 years ago
Hi,
Yes, I figured out few hours ago. I tried to delete the issue but I couldn't, sorry to bother you.
I did exactly as you said and used the payload. It is working fine.
Thank you so much for creating this receiver, it helped me a lot!
Best, On Aug 27, 2015 01:28, "Dibyendu Bhattacharya" notifications@github.com wrote:
Hi,
Did you figure it out how to get the byte messages from Kafka ?
The ReceiverLauncher.launch returns the DStream or JavaDStream (for Java) , and you can call the getPayload on MessageAndMetadata object to get the byte array...
Something like this ..
val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)
//Lets convert the Array[Byte] to String val stream = tmp_stream.map(x => { val s = new String(x.getPayload); s })
— Reply to this email directly or view it on GitHub https://github.com/dibbhatt/kafka-spark-consumer/issues/24#issuecomment-135287217 .
Hi,
Not an issue. You can raise any issue/question . Its perfectly fine.
Glad to know this receiver is useful to you ..
hi @fzschornack
I have created a JIRA to track the progress of contributing back this project to Apache Spark.
https://issues.apache.org/jira/browse/SPARK-11045
This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.
kindly Vote for this JIRA.
Voted!
thanks
Hi,
Did you figure it out how to get the byte messages from Kafka ?
The ReceiverLauncher.launch returns the DStream or JavaDStream (for Java) , and you can call the getPayload on MessageAndMetadata object to get the byte array...
Something like this ..
val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)
//Lets convert the Array[Byte] to String val stream = tmp_stream.map(x => { val s = new String(x.getPayload); s })