dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Manipulation of offsetRanges in each batch #66

Open Bonnie16 opened 5 years ago

Bonnie16 commented 5 years ago

hello, I am using spark streaming and kafka java api, version of org.apache.spark (version 2.3.0) and org.apache.kafka(0.10).

I am using sample code provided here: https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html. But I made some modifications to the OffsetRanges in each batch like show below

stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

// make some change to the OffsetRanges
...

// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

}); The problem is after modificaiton(the offset ranges did change) and commitAsync, I did not receive OffsetRanges as I expected in the next batch.

How should I resolve this?