StarRocks / starrocks-connector-for-kafka

Apache License 2.0
7 stars 12 forks source link

StarRocks Kafka Connector rewinds offsets too far on connection failures #33

Open wzorgdrager opened 1 month ago

wzorgdrager commented 1 month ago

When the StarRocks cluster becomes temporarily unavailable, the Kafka connector rewinds consumer offsets far beyond necessary, discarding the state of previously successful loads. This causes duplicate processing of millions of records that were already successfully loaded into StarRocks.

The following happened:

  1. Connector successfully loads data for hours/days (visible in logs: numberOfSuccessLoad=2422, totalSuccessLoadRows=22033454)
  2. When StarRocks becomes temporarily unavailable, connector fails with: WARN [starrocks|task-0] Stream load failure, com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: ****, table: ****
  3. Instead of only rewinding to the last in-progress batch, the connector rewinds to a much earlier offset: ERROR [starrocks|task-0] WorkerSinkTask{id=starrocks-0} Offset commit failed, rewinding to last committed offsets
  4. This causes the consumer lag to jump dramatically (in our case, from 0 to 30 million messages). Screenshot 2024-10-19 at 09 54 01

The full logs during this event:

[2024-10-19 07:38:26,027] INFO [starrocks|task-0] StreamLoadManagerV2 close, loadMetrics: LoadMetrics{startTimeNano=12141759914285940, totalRunningTimeNano=14473446449494, numberOfSuccessLoad=2422, totalSuccessLoadBytes=11348292400, totalSuccessLoadRows=22033454, totalSuccessLoadTimeNano=619523950125, numberOfFailedLoad=0, numberWriteTriggerFlush=157, numberWriteBlock=0, totalWriteBlockTimeNano=0}, flushAndCommit: FlushAndCommitStrategy{expectDelayTime=20000, scanFrequency=50, ageThreshold=400, maxCacheBytes=67108864, enableAutoCommit=true, numAgeTriggerFlush=2366, numCacheTriggerFlush=1135, numTableTriggerFlush=0} (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:411)
[2024-10-19 07:38:26,027] INFO [starrocks|task-0] Default Stream loader closed (com.starrocks.data.load.stream.DefaultStreamLoader:140)
[2024-10-19 07:38:26,027] WARN [starrocks|task-0] Stream load failure, com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: ****, table: ****, label: -78b164f4-d6a9-48f7-9c8a-d96a150aa708, 
[2024-10-19 07:38:26,027] WARN [starrocks|task-0] Current retry times is 0 (com.starrocks.connector.kafka.StarRocksSinkTask:363)
[2024-10-19 07:38:26,027] INFO [starrocks|task-0] FlushAndCommitStrategy{expectDelayTime=20000, scanFrequency=50, ageThreshold=400, maxCacheBytes=67108864, enableAutoCommit=true, numAgeTriggerFlush=0, numCacheTriggerFlush=0, numTableTriggerFlush=0} (com.starrocks.data.load.stream.v2.FlushAndCommitStrategy:56)
[2024-10-19 07:38:26,027] INFO [starrocks|task-0] StarRocks-Sink-Manager start, enableAutoCommit: true, streamLoader: com.starrocks.data.load.stream.TransactionStreamLoader, GitInformation{gitBuildTime='2023-11-30T14:05:45+0800', gitCommitId='7ff48ad17c654c7aa58a3d825e19ac1e3d714c71', gitCommitIdAbbrev='7ff48ad', gitCommitTime='2023-11-30T14:01:41+0800'} (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:233)
[2024-10-19 07:38:26,027] INFO [starrocks|task-0] manager running, scanningFrequency : 50 (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:157)
[2024-10-19 07:38:26,029] INFO [prod-starrocks|task-0] Default Stream Loader start, properties : ****
[2024-10-19 07:38:26,029] ERROR [starrocks|task-0] WorkerSinkTask{id=starrocks} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:430)
[2024-10-19 07:38:26,029] INFO [starrocks|task-0] [Consumer clientId=connector-consumer-starrocks, groupId=starrocks] Seeking to offset **** for partition ****-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2024-10-19 07:38:26,029] INFO [starrocks|task-0] [Consumer clientId=connector-consumer-starrocks, groupId=starrocks] Seeking to offset **** for partition ****-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2024-10-19 07:38:26,029] INFO [starrocks|task-0] [Consumer clientId=connector-consumer-starrocks, groupId=starrocks] Seeking to offset **** for partition ****-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2024-10-19 07:38:26,029] INFO [starrocks|task-0] [Consumer clientId=connector-consumer-starrocks, groupId=starrocks] Seeking to offset **** for partition ****-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2024-10-19 07:38:26,029] INFO [starrocks|task-0] [Consumer clientId=connector-consumer-starrocks, groupId=starrocks] Seeking to offset **** for partition ****-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2024-10-19 07:38:26,029] ERROR [starrocks|task-0] WorkerSinkTask{id=starrocks} Commit of offsets threw an unexpected exception for sequence number 1332: null (org.apache.kafka.connect.runtime.WorkerSinkTask:285)

Some relevant configurations:

"bufferflush.intervalms": "20000",
"connect.timeoutms": "60000",
"consumer.override.enable.auto.commit": "true"

Is there a configuration I'm missing or is this is a bug in the connector?