dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Unable to pull Batch info metrics using StreamingListener Interface #6

Closed akhld closed 9 years ago

akhld commented 9 years ago

I have a jobListener extending StreamingListener interface to perform some tasks onBatchCompleted, it works pretty well with native SparkStreaming and KafkaUtil. But its giving wrong values when i used with this lowlevel consumer.

Here's the Listener class:

private class JobListenern(ssc: StreamingContext) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {

    var totalProcessedRecords = 0L
    println("====> Total delay:  " + batchCompleted.batchInfo.totalDelay.getOrElse(-1) + " ms")

    batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
      totalProcessedRecords += infos.map(_.numRecords).sum        
    }
    println("\n=====> Recieved Events: "+ totalProcessedRecords)

  }
}

You can attach it to your ssc as:

val listen = new JobListenern(ssc)
ssc.addStreamingListener(listen)

ssc.start()
ssc.awaitTermination()

Let me know if there's some other way to pull batch info

akhld commented 9 years ago

kafka-con

Any idea on how to get these numbers up?

dibbhatt commented 9 years ago

Hi, Sorry for replying late on this. I think this is an issue with Spark. If you see how I generate the Blocks and writes to BlockManager, I used following call..

_receiver.store(_dataBuffer.iterator());

You can see this in PartitionManager.java line # 216

I think if you write the Block this way, Spark not able to calculate the number of records properly.

You can probably raise a JIRA for Spark.

Dibyendu

akhld commented 9 years ago

Thanks for the reply Dibyendu. Will try to raise a JIRA with Spark.

dibbhatt commented 9 years ago

Hi @akhld . I have fixed the UI Reporting issue in latest consumer. Please take a look at it and let me know how it looks ..

Dibyendu

akhld commented 9 years ago

Ohh nice. Will let you know. :) On 3 Jun 2015 17:33, "Dibyendu Bhattacharya" notifications@github.com wrote:

Hi @akhld https://github.com/akhld . I have fixed the UI Reporting issue in latest consumer. Please take a look at it and let me know how it looks ..

Dibyendu

— Reply to this email directly or view it on GitHub https://github.com/dibbhatt/kafka-spark-consumer/issues/6#issuecomment-108343350 .

akhld commented 9 years ago

I cloned the latest version and ran it once, but it doesn't show any metrices in the streaming tab. Previously it was -ve values now its 0.

image

dibbhatt commented 9 years ago

Are you pushing messages to Kafka ? Initial few batches comes as Zero. Refresh the page and check if you see the stats

akhld commented 9 years ago

Yes, messages are pushed and its consuming and printing the count also. image

I ran the job for a minute, it processed around 50 batches (batch interval being 1 second). And you can see from the image the "Processed Batches" is also broken as its always 1. I'm using spark 1.3.1 and recompiled lowlevel kafka with 1.3.1 version of spark.

dibbhatt commented 9 years ago

You have 1.2 setup ? My team has successfully tested in 1.2. Also I am presently doing some work on Spark master ( 1.4) and that works there also . Not tested with 1.3.1 though.

akhld commented 9 years ago

I have it just tested on 1.2.2, still the same. I'm trying in local mode. image

dibbhatt commented 9 years ago

Closing this as latest changes solved this problem

dibbhatt commented 9 years ago

Hi @akhld

I have created a JIRA to track the progress of contributing back this project to Apache Spark.

https://issues.apache.org/jira/browse/SPARK-11045

This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.

kindly Vote for this JIRA.

akhld commented 9 years ago

Great, voted already. Hope people will start contributing to it.

On Sat, Oct 10, 2015 at 11:59 AM, Dibyendu Bhattacharya < notifications@github.com> wrote:

Hi @akhld https://github.com/akhld

I have created a JIRA to track the progress of contributing back this project to Apache Spark.

https://issues.apache.org/jira/browse/SPARK-11045

This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.

kindly Vote for this JIRA.

— Reply to this email directly or view it on GitHub https://github.com/dibbhatt/kafka-spark-consumer/issues/6#issuecomment-147045584 .

Cheers!

dibbhatt commented 9 years ago

Thanks a lot Akhil. Much appreciated.