robinhood / faust

Python Stream Processing
Other
6.75k stars 533 forks source link

Error while rolling log segments for a topic #525

Open Mahamutha opened 4 years ago

Mahamutha commented 4 years ago

Checklist

Steps to reproduce

How to prevent the deleting of topic/log file when this error occurs "Error while rolling log segments for ble_rtls-0"? Below is the log policy which I have?

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
delete.topic.enable = false

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=1

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

Expected behavior

When the offset is out of range for the partition, the offset gets reset. How to prevent this? What property do I need to set?

Actual behavior

Once the offset range exceeds, the topic gets deleted and faust kafka streaming gets crashed.

Full traceback

[2020-02-07 20:03:27,692] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-02-07 20:12:51,562] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:621)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:745)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:733)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:733)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:471)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
[2020-02-07 20:12:51,592] INFO [ReplicaManager broker=0] Stopping serving replicas in dir /var/lib/kafka/data (kafka.server.ReplicaManager)
[2020-02-07 20:12:51,597] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8,__consumer_offsets-21,__consumer_offsets-4,rtls_ble-6,__consumer_offsets-27,__consumer_offsets-7,__consumer_offsets-9,rtls_ble-5,__consumer_offsets-46,ble_direction-direction_app-directionJson.mac-repartition-4,rtls_ble-4,ble_direction-direction_app-directionJson.mac-repartition-0,__consumer_offsets-25,__consumer_offsets-35,__consumer_offsets-41,rtls_ble-1,__consumer_offsets-33,__consumer_offsets-23,__consumer_offsets-49,ble_direction-direction_app-directionJson.mac-repartition-7,rfid_direction-0,rtls_ble-2,__consumer_offsets-47,__consumer_offsets-16,__consumer_offsets-28,location_event_data-0,direction_app-ble_direction_table-changelog-1,__consumer_offsets-31,__consumer_offsets-36,directionality-0,direction_app-ble_direction_table-changelog-3,rtls_ble-7,__consumer_offsets-42,rtls-1,__consumer_offsets-3,direction_app-ble_direction_table-changelog-2,__consumer_offsets-18,__consumer_offsets-37,__consumer_offsets-15,__consumer_offsets-24,direction_app-ble_direction_table-changelog-0,direction_app-ble_direction_table-changelog-4,ble_direction-direction_app-directionJson.mac-repartition-6,zoid_location_event_data-0,__consumer_offsets-38,ble_direction-direction_app-directionJson.mac-repartition-2,__consumer_offsets-17,ble_direction-0,__consumer_offsets-48,direction_app-ble_direction_table-changelog-7,zoid_location_event_data-1,direction_app-ble_direction_table-changelog-6,__consumer_offsets-19,__consumer_offsets-11,ble_direction-direction_app-directionJson.mac-repartition-5,ble_direction-direction_app-directionJson.mac-repartition-1,rtls-0,__consumer_offsets-13,ble_rtls-1,__consumer_offsets-2,__consumer_offsets-43,__consumer_offsets-6,__consumer_offsets-14,rtls_ble-3,ble_direction-direction_app-directionJson.mac-repartition-3,rtls_ble-0,__consumer_offsets-20,__consumer_offsets-0,__consumer_offsets-44,direction_app-ble_direction_table-changelog-5,__consumer_offsets-39,__consumer_offsets-12,ble_rtls-0,__consumer_offsets-45,__consumer_offsets-1,__consumer_offsets-5,__consumer_offsets-26,__consumer_offsets-29,direction_app-__assignor-__leader-0,__consumer_offsets-34,__consumer_offsets-10,rtls_app-__assignor-__leader-0,__consumer_offsets-32,__consumer_offsets-40 (kafka.server.ReplicaFetcherManager)
[2020-02-07 20:12:51,598] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8,__consumer_offsets-21,__consumer_offsets-4,rtls_ble-6,__consumer_offsets-27,__consumer_offsets-7,__consumer_offsets-9,rtls_ble-5,__consumer_offsets-46,ble_direction-direction_app-directionJson.mac-repartition-4,rtls_ble-4,ble_direction-direction_app-directionJson.mac-repartition-0,__consumer_offsets-25,__consumer_offsets-35,__consumer_offsets-41,rtls_ble-1,__consumer_offsets-33,__consumer_offsets-23,__consumer_offsets-49,ble_direction-direction_app-directionJson.mac-repartition-7,rfid_direction-0,rtls_ble-2,__consumer_offsets-47,__consumer_offsets-16,__consumer_offsets-28,location_event_data-0,direction_app-ble_direction_table-changelog-1,__consumer_offsets-31,__consumer_offsets-36,directionality-0,direction_app-ble_direction_table-changelog-3,rtls_ble-7,__consumer_offsets-42,rtls-1,__consumer_offsets-3,direction_app-ble_direction_table-changelog-2,__consumer_offsets-18,__consumer_offsets-37,__consumer_offsets-15,__consumer_offsets-24,direction_app-ble_direction_table-changelog-0,direction_app-ble_direction_table-changelog-4,ble_direction-direction_app-directionJson.mac-repartition-6,zoid_location_event_data-0,__consumer_offsets-38,ble_direction-direction_app-directionJson.mac-repartition-2,__consumer_offsets-17,ble_direction-0,__consumer_offsets-48,direction_app-ble_direction_table-changelog-7,zoid_location_event_data-1,direction_app-ble_direction_table-changelog-6,__consumer_offsets-19,__consumer_offsets-11,ble_direction-direction_app-directionJson.mac-repartition-5,ble_direction-direction_app-directionJson.mac-repartition-1,rtls-0,__consumer_offsets-13,ble_rtls-1,__consumer_offsets-2,__consumer_offsets-43,__consumer_offsets-6,__consumer_offsets-14,rtls_ble-3,ble_direction-direction_app-directionJson.mac-repartition-3,rtls_ble-0,__consumer_offsets-20,__consumer_offsets-0,__consumer_offsets-44,direction_app-ble_direction_table-changelog-5,__consumer_offsets-39,__consumer_offsets-12,ble_rtls-0,__consumer_offsets-45,__consumer_offsets-1,__consumer_offsets-5,__consumer_offsets-26,__consumer_offsets-29,direction_app-__assignor-__leader-0,__consumer_offsets-34,__consumer_offsets-10,rtls_app-__assignor-__leader-0,__consumer_offsets-32,__consumer_offsets-40 (kafka.server.ReplicaAlterLogDirsManager)
[2020-02-07 20:12:51,601] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:621)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:745)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:733)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:733)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:471)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
[2020-02-07 20:12:51,608] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:621)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:745)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:733)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:733)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:471)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
[2020-02-07 20:12:51,619] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:621)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:745)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:733)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:733)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:471)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
[2020-02-07 20:12:51,623] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:621)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:745)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:733)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:733)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:471)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
[2020-02-07 20:12:51,625] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:183)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1479)
at scala.Option.foreach(Option.scala:257)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1465)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.roll(Log.scala:1465)
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1450)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:858)
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:752)
at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
at kafka.log.Log.append(Log.scala:752)
at kafka.log.Log.appendAsLeader(Log.scala:722)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:634)
at kafka.cluster.Partition$$anonfun$15.apply(Partition.scala:622)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition

Versions

StephenSorriaux commented 4 years ago

Hello,

Is this error related to Faust? It seems like your Kafka is trying to remove a file that does not exist anymore. Concerning your question, when a consumer group is at an offset that does not exist anymore, in Faust it will use the consumer_auto_offset_reset option to know what to do: by default reset to the earliest offset.