AutoMQ / automq

AutoMQ is a cloud-first alternative to Kafka by decoupling durability to S3 and EBS. 10x Cost-Effective. No Cross-AZ Traffic Cost. Autoscale in seconds. Single-digit ms latency.
https://www.automq.com/docs
Other
3.83k stars 214 forks source link

[BUG] Empty stream get invalid endOffset from Controller #1222

Closed superhx closed 5 months ago

superhx commented 6 months ago

What went wrong?

Partition __consumer_offsets-27 encountered an UNCLEAN SHUTDOWN during the recovery process. An error occurred during the Time Index recovery, stating that 'fetch startOffset 1980 is greater than endOffset '. This error caused the partition to be unable to open.

05-04 19:17:23 trim the stream streamId=375, streamEpoch=0, trimOffset=1980

[2024-05-04 19:17:58,309] INFO Upload delta WAL CommitStreamSetObjectRequest{objectId=258619, orderId=258619, objectSize=32690327, streamRanges=[..., (375--1,1968-1980-12),...], streamObjects=[...], compactedObjectIds=null}, cost 4866ms, rate limiter 7.17726016E7bytes/s (com.automq.stream.s3.DeltaWALUploadTask)

[2024-05-04 19:32:22,325] INFO [Controller 1] [CommitStreamSetObject]: successfully commit stream set object. streamSetObjectId=258668, nodeId=1, nodeEpoch=1713855737670, compacted objects: [258626, 258562, 258496, 258503, 258570, 258633, 258510, 258577, 258640, 258647, 258523, 258584, 258591, 258654, 258530, 258598, 258537, 258605, 258477, 258612, 258548, 258619, 258555, 258489, 258488]

05-06 18:53:35 recover try close stream ... streamId=375, epoch=0, startOffset=1980, endOffset=1968, state=OPEN

[2024-05-07 15:41:43,600] INFO [ElasticLog partition=__consumer_offsets-27 epoch=6] save log meta ElasticLogMeta{streamMap={log=238, tim=375, tim.cleaned=695, txn.cleaned=-1, txn=-1, log.cleaned=694}, lastNthSegmentMetas=[ElasticStreamSegmentMeta{baseOffset=0, createTimestamp=1714821442539, lastModifiedTimestamp=1714614140818, streamSuffix='.cleaned', logSize=1915, log=[1290164, 3457086], time=[12, 24], txn=[0, 0], firstBatchTimestamp=1714275140818, timeIndexLastEntry=TimestampOffsetData{timestamp=1714614140818, offset=2166921}}, ElasticStreamSegmentMeta{baseOffset=2166922, createTimestamp=1714821433823, lastModifiedTimestamp=1714821433824, streamSuffix='', logSize=125, log=[2166922, -1], time=[1980, -1], txn=[0, -1], firstBatchTimestamp=1714821433823, timeIndexLastEntry=TimestampOffsetData{timestamp=-1, offset=2166922}}]} (kafka.log.streamaspect.ElasticLog$)

[2024-05-07 15:41:43,644] ERROR [Stream id=375 epoch=6] stream fetch [1980, 1968) 12 fail (com.automq.stream.s3.S3Stream)
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: fetch startOffset 1980 is greater than endOffset 1968
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1177)
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
    at com.automq.stream.s3.S3Stream.fetch(S3Stream.java:191)
    at com.automq.stream.s3.S3StreamClient$StreamWrapper.fetch(S3StreamClient.java:261)
    at kafka.log.streamaspect.AlwaysSuccessClient$StreamImpl.fetch(AlwaysSuccessClient.java:247)
    at kafka.log.streamaspect.LazyStream.fetch(LazyStream.java:130)
    at kafka.log.streamaspect.DefaultElasticStreamSlice.fetch(DefaultElasticStreamSlice.java:74)
    at kafka.log.streamaspect.ElasticStreamSlice.fetch(ElasticStreamSlice.java:52)
    at kafka.log.streamaspect.ElasticStreamSlice.fetch(ElasticStreamSlice.java:56)
    at kafka.log.streamaspect.ElasticTimeIndex.parseEntry(ElasticTimeIndex.scala:101)
    at kafka.log.streamaspect.ElasticTimeIndex.$anonfun$entry$1(ElasticTimeIndex.scala:80)
    at kafka.log.streamaspect.AbstractStreamIndex.maybeLock(AbstractStreamIndex.scala:192)
    at kafka.log.streamaspect.ElasticTimeIndex.entry(ElasticTimeIndex.scala:76)
    at kafka.log.streamaspect.ElasticTimeIndex.loadLastEntry(ElasticTimeIndex.scala:61)
    at kafka.log.streamaspect.ElasticLogSegment.recover0(ElasticLogSegment.scala:215)
    at kafka.log.streamaspect.ElasticLogSegment.recover(ElasticLogSegment.scala:206)
    at kafka.log.streamaspect.ElasticLogLoader.recoverSegment(ElasticLogLoader.scala:107)
    at kafka.log.streamaspect.ElasticLogLoader.recoverLog(ElasticLogLoader.scala:155)

What is the reason?

Summary: The endOffset of stream range depends on the replay of StreamSetObject and StreamObject records, there is a problem where endOffset is smaller than startOffset in scenarios where 'trim progress exceeds upload progress'.

Detail:

  1. xx-xx xx:xx:xx. create streamId=375 and generate RangeRecord{index=0, start=0, end=0};
  2. xx-xx xx:xx:xx. __consumer_offsets-27 append consumer offset and streamId=375's (memory)endOffset increase to 1968;
  3. 05-04 19:17:23 streamId=375 trim to 1980 cause of topic compaction; StreamContollerManager#trimStream
  4. 05-04 19:17:58 nodeId=0 upload WAL to S3 with objectId=258619, and update streamId=375's (memory)endOffset to 1980; StreamControllerManager#replay(S3StreamSetObjectRecord)
  5. 05-04 19:32:22 stream set object compaction delete step 4 generated object objectId=258619;
  6. xx-xx xx:xx:xx. Controller reboot, streamId=375's endOffset can only be replayed to 1968 cause of objectId=258619 is already deleted, so getOpeningStreams/openStream return 'streamId=375, epoch=0, startOffset=1980, endOffset=1968';
  7. 05-07 15:41:43 __consumer_offsets-27 partition recover get wrong endOffset

Affects versions

1.0.x

Fix

Correct the nextOffset of openStream.

superhx commented 5 months ago

Another scenario:

  1. Stream advances endOffset to 12 and triggers upload objectId=0.
  2. Stream advances endOffset to 24 and triggers upload objectId=1.
  3. Stream trims to 24, objectId=0 still has unexpired data from other streams, and all data in objectId=1 is expired. So objectId=1 was deleted
  4. Controller restarts, and the stream's range endOffset can only be replayed to 12.
  5. Stream advances endOffset to 36 and triggers upload objectId=2, but 24 does not match 12, resulting in OFFSET_NOT_MATCH error.
superhx commented 5 months ago

Why is RangeRecord not generated to advance the endOffset of the range when uploading an object? The StreamSetObject contains thousands of streams, if we do so, the controller will generate too many records.

superhx commented 5 months ago

Another scenario:

  1. Stream advances endOffset to 12 and triggers upload objectId=0.
  2. Stream advances endOffset to 24 and triggers upload objectId=1.
  3. Stream trims to 24, objectId=0 still has unexpired data from other streams, and all data in objectId=1 is expired. So objectId=1 was deleted
  4. Controller restarts, and the stream's range endOffset can only be replayed to 12.
  5. Stream advances endOffset to 36 and triggers upload objectId=2, but 24 does not match 12, resulting in OFFSET_NOT_MATCH error.

In the 'trim progress exceeds upload progress' scenario, the range endOffset may experience a bounce back.

Retrieving and Checking Logic of endOffset: Old:

The Fix: