AutoMQ / automq

AutoMQ is a cloud-first alternative to Kafka by decoupling durability to S3 and EBS. 10x Cost-Effective. No Cross-AZ Traffic Cost. Autoscale in seconds. Single-digit ms latency.
https://www.automq.com/docs
Other
3.78k stars 215 forks source link

[BUG] Kraft Metadata Cluster down due commit recordbatch exceed limit #2057

Open lifepuzzlefun opened 1 week ago

lifepuzzlefun commented 1 week ago

Version & Environment

master

What went wrong?

raft no available leader the leader elected and step down due commit fail.

[2024-10-09 18:39:02,084] INFO [RaftManager id=2] Become candidate due to fetch timeout (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:02,085] INFO [RaftManager id=2] Completed transition to CandidateState(localId=2, localDirectoryId=ZopiWufpyFjYOJ-CzWKiMQ,epoch=951, retries=1, voteStates={1=UNRECORDED, 2=GRANTED, 3=UNRECORDED}, highWatermark=Optional[LogOffsetMetadata(offset=3186751955, metadata=Optional.empty)], electionTimeoutMs=1495) from FollowerState(fetchTimeoutMs=2000, epoch=950, leaderId=1, voters=[1, 2, 3], highWatermark=Optional[LogOffsetMetadata(offset=3186751955, metadata=Optional.empty)], fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:02,087] INFO [RaftManager id=2] Completed transition to Leader(localId=2, epoch=951, epochStartOffset=3186751957, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=false), 2=ReplicaState(nodeId=2, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true), 3=ReplicaState(nodeId=3, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=false)}) from CandidateState(localId=2, localDirectoryId=ZopiWufpyFjYOJ-CzWKiMQ,epoch=951, retries=1, voteStates={1=GRANTED, 2=GRANTED, 3=UNRECORDED}, highWatermark=Optional[LogOffsetMetadata(offset=3186751955, metadata=Optional.empty)], electionTimeoutMs=1495) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:02,090] INFO [RaftManager id=2] High watermark set to LogOffsetMetadata(offset=3186751958, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=260993522)]) for the first time for epoch 951 based on indexOfHw 1 and voters [ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=3186751958, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=260993522)])], lastFetchTimestamp=1728470342090, lastCaughtUpTimestamp=1728470342090, hasAcknowledgedLeader=true), ReplicaState(nodeId=2, endOffset=Optional[LogOffsetMetadata(offset=3186751958, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=260993522)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true), ReplicaState(nodeId=3, endOffset=Optional[LogOffsetMetadata(offset=3186751957, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=260993416)])], lastFetchTimestamp=1728470342089, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)] (org.apache.kafka.raft.LeaderState)
[2024-10-09 18:39:05,994] ERROR Encountered quorum controller fault: commitStreamSetObject: event failed with IllegalStateException (treated as UnknownServerException) at epoch 951 in 12120 microseconds. Renouncing leadership and reverting to the last committed offset 3186752114. (org.apache.kafka.server.fault.LoggingFaultHandler)
java.lang.IllegalStateException: Attempted to atomically commit 38457 records, but maxRecordsPerBatch is 25000
    at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:1034)
    at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:936)
    at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:131)
    at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:214)
    at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:185)
    at java.base/java.lang.Thread.run(Thread.java:833)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Received user request to resign from the current epoch 951 (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 3 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 1 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 1001 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 1004 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 1003 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Failed to handle fetch from 1002 at 3186752115 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:05,994] INFO [RaftManager id=2] Completed transition to ResignedState(localId=2, epoch=951, voters=[1, 2, 3], electionTimeoutMs=1366, unackedVoters=[1, 3], preferredSuccessors=[1, 3]) from Leader(localId=2, epoch=951, epochStartOffset=3186751957, highWatermark=Optional[LogOffsetMetadata(offset=3186752115, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=261000559)])], voterStates={1=ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=3186752115, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=261000559)])], lastFetchTimestamp=1728470345961, lastCaughtUpTimestamp=1728470345961, hasAcknowledgedLeader=true), 2=ReplicaState(nodeId=2, endOffset=Optional[LogOffsetMetadata(offset=3186752115, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=261000559)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true), 3=ReplicaState(nodeId=3, endOffset=Optional[LogOffsetMetadata(offset=3186752115, metadata=Optional[(segmentBaseOffset=3182036153,relativePositionInSegment=261000559)])], lastFetchTimestamp=1728470345961, lastCaughtUpTimestamp=1728470345961, hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:05,998] INFO [RaftManager id=2] Completed transition to Unattached(epoch=952, voters=[1, 2, 3], electionTimeoutMs=1238) from ResignedState(localId=2, epoch=951, voters=[1, 2, 3], electionTimeoutMs=1366, unackedVoters=[], preferredSuccessors=[1, 3]) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:05,998] INFO [RaftManager id=2] Completed transition to Voted(epoch=952, votedKey=ReplicaKey(id=1, directoryId=Optional.empty), voters=[1, 2, 3], electionTimeoutMs=1223, highWatermark=Optional.empty) from Unattached(epoch=952, voters=[1, 2, 3], electionTimeoutMs=1238) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:05,998] INFO [RaftManager id=2] Vote request VoteRequestData(clusterId='DMSoJVXo9Q', topics=[TopicData(topicName='__cluster_metadata', partitions=[PartitionData(partitionIndex=0, candidateEpoch=952, candidateId=1, lastOffsetEpoch=951, lastOffset=3186752115)])]) with epoch 952 is granted (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:06,002] INFO [RaftManager id=2] Completed transition to FollowerState(fetchTimeoutMs=2000, epoch=952, leaderId=1, voters=[1, 2, 3], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from Voted(epoch=952, votedKey=ReplicaKey(id=1, directoryId=Optional.empty), voters=[1, 2, 3], electionTimeoutMs=1223, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState)
[2024-10-09 18:39:06,100] INFO [RaftManager id=2] High watermark set to Optional[LogOffsetMetadata(offset=3186752116, metadata=Optional.empty)] for the first time for epoch 952 (org.apache.kafka.raft.FollowerState)
[2024-10-09 18:39:07,177] INFO [RaftManager id=2] Become candidate due to fetch timeout (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-09 18:39:07,179] INFO [RaftManager id=2] Completed transition to CandidateState(localId=2, localDirectoryId=ZopiWufpyFjYOJ-CzWKiMQ,epoch=953, retries=1, voteStates={1=UNRECORDED, 2=GRANTED, 3=UNRECORDED}, highWatermark=Optional[LogOffsetMetadata(offset=3186752217, metadata=Optional.empty)], electionTimeoutMs=1295) from FollowerState(fetchTimeoutMs=2000, epoch=952, leaderId=1, voters=[1, 2, 3], highWatermark=Optional[LogOffsetMetadata(offset=3186752217, metadata=Optional.empty)], fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState)

What should have happened instead?

How to reproduce the issue?

create 5w+ partition and delete them at 100 concurrency and only have 5 node in cluster. stop one node and start it.

so one node have big partition number. once trigger upload their may have a lot of StreamObject and Stream to upload I think when commitSSO this will happen. and this may cause the whole cluster not function.

Additional information

Please attach any relevant logs, backtraces, or metric charts.

lifepuzzlefun commented 1 week ago

logs on local node

[2024-10-09 18:44:46,987] ERROR Error while committing stream set object: CommitStreamSetObjectRequestData(nodeId=1001, nodeEpoch=1728455064371, objectId=-1, orderId=0, objectSize=0, objectStreamRanges=[], streamObjects=[StreamObject(objectId=506158709, objectSize=4017507, streamId=1, startOffset=598931560, endOffset=599268033, attributes=0), StreamObject(objectId=506158710, objectSize=1899, streamId=2, startOffset=2444640, endOffset=2445036, attributes=0), StreamObject(objectId=506158711, objectSize=600, streamId=909, startOffset=2745, endOffset=2747, attributes=0), StreamObject(objectId=506158712, objectSize=564, streamId=914, startOffset=3127, endOffset=3129, attributes=0), StreamObject(objectId=506158713, objectSize=504, streamId=929, startOffset=3070, endOffset=3072, attributes=0), StreamObject(objectId=506158714, objectSize=6221929, streamId=280272, startOffset=3800667875, endOffset=3800712716, attributes=0), StreamObject(objectId=506158715, objectSize=11824, streamId=280311, startOffset=5993268, endOffset=5996388, attributes=0), StreamObject(objectId=506158716, objectSize=6057254, streamId=281160, startOffset=4149131501, endOffset=4149175153, attributes=0), StreamObject(objectId=506158717, objectSize=11734, streamId=281179, startOffset=6539280, endOffset=6542376, attributes=0), StreamObject(objectId=506158718, objectSize=5914086, streamId=281330, startOffset=4062577915, endOffset=4062620536, attributes=0), StreamObject(objectId=506158719, objectSize=11824, streamId=281346, startOffset=6397752, endOffset=6400872, attributes=0), StreamObject(objectId=506158720, objectSize=6582790, streamId=281708, startOffset=4033410927, endOffset=4033458367, attributes=0), StreamObject(objectId=506158721, objectSize=11959, streamId=281801, startOffset=6353448, endOffset=6356604, attributes=0), StreamObject(objectId=506158722, objectSize=252, streamId=281982, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158723, objectSize=252, streamId=281986, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158724, objectSize=252, streamId=281987, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158725, objectSize=252, streamId=281990, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158726, objectSize=252, streamId=281991, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158727, objectSize=252, streamId=281992, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158728, objectSize=252, streamId=281994, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158729, objectSize=252, streamId=281995, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158730, objectSize=424, streamId=281998, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158731, objectSize=252, streamId=281999, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158732, objectSize=252, streamId=282003, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158733, objectSize=252, streamId=282009, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158734, objectSize=252, streamId=282011, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158735, objectSize=252, streamId=282015, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158736, objectSize=252, streamId=282016, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158737, objectSize=252, streamId=282018, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158738, objectSize=424, streamId=282020, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158739, objectSize=252, streamId=282028, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158740, objectSize=252, streamId=282031, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158741, objectSize=252, streamId=282032, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158742, objectSize=252, streamId=282036, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158743, objectSize=252, streamId=282037, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158744, objectSize=424, streamId=282040, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158745, objectSize=424, streamId=282043, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158746, objectSize=252, streamId=282044, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158747, objectSize=424, streamId=282047, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158748, objectSize=252, streamId=282050, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158749, objectSize=252, streamId=282053, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158750, objectSize=252, streamId=282370, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158751, objectSize=252, streamId=282376, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158752, objectSize=252, streamId=282379, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158753, objectSize=252, streamId=282381, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158754, objectSize=252, streamId=282396, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158755, objectSize=252, streamId=282399, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158756, objectSize=252, streamId=282400, startOffset=8, endOffset=10, attributes=0), StreamObject(objectId=506158757, objectSize=424, streamId=282401, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158758, objectSize=424, streamId=282403, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158759, objectSize=424, streamId=282407, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158760, objectSize=424, streamId=283420, startOffset=12, endOffset=14, attributes=0), StreamObject(objectId=506158761, objectSize=252, streamId=283421, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158762, objectSize=252, streamId=283423, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158763, objectSize=252, streamId=283431, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158764, objectSize=252, streamId=283457, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158765, objectSize=252, streamId=283469, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158766, objectSize=252, streamId=283496, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158767, objectSize=252, streamId=283516, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158768, objectSize=252, streamId=283528, startOffset=10, endOffset=12, attributes=0), StreamObject(objectId=506158769, objectSize=749, streamId=286265, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158770, objectSize=749, streamId=286267, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158771, objectSize=749, streamId=286271, startOffset=6, endOffset=10, attributes=0), StreamObject(objectId=506158772, objectSize=749, streamId=286288, startOffset=10, endOffset=14, attributes=0), StreamObject(objectId=506158773, objectSize=749, streamId=286689, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158774, objectSize=749, streamId=286692, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158775, objectSize=749, streamId=287267, startOffset=6, endOffset=10, attributes=0), StreamObject(objectId=506158776, objectSize=749, streamId=287273, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158777, objectSize=749, streamId=287275, startOffset=4, endOffset=8, attributes=0), StreamObject(objectId=506158778, objectSize=749, streamId=287276,
...............

, StreamObject(objectId=506177973, objectSize=570, streamId=586022, startOffset=0, endOffset=4, attributes=0), StreamObject(objectId=506177974, objectSize=570, streamId=586028, startOffset=0, endOffset=4, attributes=0)], compactedObjectIds=[506139212], failoverMode=false, attributes=-1), code: UNKNOWN_SERVER_ERROR, retry later (kafka.log.stream.s3.objects.ControllerObjectManager)
superhx commented 14 hours ago

It caused by SSO compaction force split.

I think we should limit the number of Streams for an Object in DeltaWALUploadTask to less than 10,000 to avoid exceeding the limits during subsequent SSO Compaction. A possible approach is: DeltaWALUploadTask splits the data to be uploaded into batches of 10,000 streams, generating multiple Objects.