koeninger / kafka-exactly-once

245 stars 87 forks source link

kafka consumer metrics #13

Closed mazorigal closed 7 years ago

mazorigal commented 7 years ago

Hi,

I would like to verify small thing. In case my stream processing is bigger then the micro-batch interval, I see queuing up tasks with how much records would be consumed. The consumption would be done actually only when that task started ? (so the tasks in the queue are not consuming from kafka until they actually executed)

My workflow is quite simple, I read from kafka, do some simple processing then reshuffle the data for some further processing (reduceByKey). Usually the job is completed fast enough without causing tasks to be queued. But from time to time I am seeing quite large delay, usually only on 1-2 workers, in the stage that consume data from kafka. I start to suspicious that the bottleneck is consuming from kafka and hence I would like to somehow measure how long it is actually take to consume each topic/partition before the data processing part starts. Any idea how to do it? maybe I can access somehow more consumers metrics ?

Thanks,

koeninger commented 7 years ago

You should be able to get a rough idea from logs, without really changing anything other than perhaps logging levels.

KafkaRDDIterator logs at info level when it starts consuming a particular offset range

logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +

  s"offsets ${part.fromOffset} -> ${part.untilOffset}")

CachedKafkaConsumer logs at debug level when it actually gets a particular offset.

logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset

requested $offset")

On Mon, Jun 26, 2017 at 11:35 AM, igor mazor notifications@github.com wrote:

Hi,

I would like to verify small thing. In case my stream processing is bigger then the micro-batch interval, I see queuing up tasks with how much records would be consumed. The consumption would be done actually only when that task started ? (so the tasks in the queue are not consuming from kafka until they actually executed)

My workflow is quite simple, I read from kafka, do some simple processing then reshuffle the data for some further processing (reduceByKey). Usually the job is completed fast enough without causing tasks to be queued. But from time to time I am seeing quite large delay, usually only on 1-2 workers, in the stage that consume data from kafka. I start to suspicious that the bottleneck is consuming from kafka and hence I would like to somehow measure how long it is actually take to consume each topic/partition before the data processing part starts. Any idea how to do it? maybe I can access somehow more consumers metrics ?

Thanks,

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/koeninger/kafka-exactly-once/issues/13, or mute the thread https://github.com/notifications/unsubscribe-auth/AAGAB58uT-OATSsDaiUDPzg4onHhCdPkks5sH93CgaJpZM4OFjpw .

mazorigal commented 7 years ago

my spark log level is Info Logger.getLogger("org.apache.spark").setLevel(Level.INFO)

but on the executers I can only see the following kafka related logs:

[INFO] 2017-06-26 14:30:04,972 [Executor task launch worker-2] org.apache.spark.streaming.kafka010.KafkaRDD logInfo - Computing topic ad_view_events, partition 4 offsets 615377965 -> 615379565

What do I miss ?

koeninger commented 7 years ago

The other log I mentioned is at debug level, not info.

But for that log you showed, the offset range start and end is the same anyway.

mazorigal commented 7 years ago

once setting the debug level, I indeed see a lot of the CachedKafkaConsumer messages. But I am still confused and not sure how those messages can help me to determine how long the process of pulling data from kafka to spark worker took ? I also noticed that for some micro-batches, usually for 1-2 workers the process of fetching data from kafka take way too long (each time is different worker) and I noticed that this time was always related to the request.timeout.ms consumer config. So if usually it take a worker less than 1 second to pull data from kafka partition, some times I see workers for which it take exactly the time set for request.timeout.ms.

koeninger commented 7 years ago

The messages are timestamped, so the relative time between the iterator and consumer messages should give you a rough idea.

As far as request timeout, may want to take a look at https://issues.apache.org/jira/browse/KAFKA-4753 , possibly related.

On Tue, Jun 27, 2017 at 6:08 AM, igor mazor notifications@github.com wrote:

once setting the debug level, I indeed see a lot of the CachedKafkaConsumer messages. But I am still confused and not sure how those messages can help me to determine how long the process of pulling data from kafka to spark worker took ? I also noticed that for some micro-batches, usually for 1-2 workers the process of fetching data from kafka take way too long (each time is different worker) and I noticed that this time was always related to the request.timeout.ms consumer config. So if usually it take a worker less than 1 second to pull data from kafka partition, some times I see workers for which it take exactly the time set for request.timeout.ms.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/koeninger/kafka-exactly-once/issues/13#issuecomment-311326655, or mute the thread https://github.com/notifications/unsubscribe-auth/AAGAB0APPKEgzTR-wwwRRCQbKu-IbV7Nks5sIOKVgaJpZM4OFjpw .

mazorigal commented 7 years ago

Indeed using the times from the log its possible, but quite cumbersome. If it would be possible to add kafka consumer different metrics as part of spark reported metrics, I think that would be quite helpful, especially when debugging is required.

koeninger commented 7 years ago

Sure, I'm just suggesting something immediate with no code changes necessary to help you with debugging. Feel free to add your own metrics or submit a PR to the spark project.

On Wed, Jun 28, 2017 at 6:33 AM, igor mazor notifications@github.com wrote:

Indeed using the times from the log its possible, but quite cumbersome. If it would be possible to add kafka consumer different metrics as part of spark reported metrics, I think that would be quite helpful, especially when debugging is required.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/koeninger/kafka-exactly-once/issues/13#issuecomment-311633794, or mute the thread https://github.com/notifications/unsubscribe-auth/AAGAB8wkGK4b8XTbMCMkDh3deVuDltmoks5sIjoOgaJpZM4OFjpw .

mazorigal commented 7 years ago

ok, thanks. I am also try to understand how can I track the consumer using kafka tools. I set the consumer group id to some value (lets say my-app), and I noticed that my-app as consumer group id would be used only for the driver, but for the executers a spark-executer prefix is added. However using the kafka groups command line tool, I can notice the my-app group id, but never saw the spark-executer-my-app group id. Do I miss something ?