opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
125 stars 45 forks source link

[BACKUP] seek offset 超過 latest offset 會 reset 至 earliest #1798

Closed Haser0305 closed 1 year ago

Haser0305 commented 1 year ago

在 sink task 中如果透過 context seek offset 超過 partition 的最新 offset,consumer 會自己 reset offset 至 earliest。 以下是模擬 partition 中 latest offset 為 999。

使用者如果設定 from 為 1200時會出現的 info log,並會持續試圖 seek offset 至 1200,且每次 put 的 records 在此狀況都會是拿 offset 0 -> offset 194,可以確認這邊會重複的從頭開始拿同樣的 records。

INFO [local-test3|task-0] [Consumer clientId=connector-consumer-local-test3-0, groupId=connect-local-test3] Seeking to offset 1200 for partition test3-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1595)

INFO [local-test3|task-0] [Consumer clientId=connector-consumer-local-test3-0, groupId=connect-local-test3] Fetch position FetchPosition{offset=1200, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.103.64:15371 (id: 1001 rack: null)], epoch=0}} is out of range for partition test3-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1403)

INFO [local-test3|task-0] [Consumer clientId=connector-consumer-local-test3-0, groupId=connect-local-test3] Resetting offset for partition test3-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.103.64:15371 (id: 1001 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:399)

seek offset 方式是透過 taskContext 先呼叫 requestCommit 再透過 offset 方法設定要前往的 offset。

目前我推測會這樣的原因是因為 Fetcher 紀錄完 resetting offset 後會呼叫 subscriptions.requestOffsetReset(topicPartition) 來 reset 此 topicPartition,在 requestOffsetReset 中是使用 defaultResetStrategy 來確定要將 topicPartition reset 至哪個 offset。

而 KafkaConsumer 的 Subscriptions 中建立時所吃的 defaultResetStrategy 就是 AUTO_OFFSET_RESET_CONFIG,且 Worker 所建立的 baseConsumerConfigs 中 AUTO_OFFSET_RESET_CONFIG 是設定為 earliest。

但因為目前沒有在往下看 requestOffsetReset 之後是怎麼處理 offsetResetStrategy,因此不確定以上的 trace 是否為這現象的原因

chia7712 commented 1 year ago

欸 描述好像是空白的

Haser0305 commented 1 year ago

欸 描述好像是空白的

抱歉,剛剛在打標題時不小心就發出來了,因此描述的部分是用更新補上

chia7712 commented 1 year ago

@Haser0305 感謝補充說明,可否試著“條列”重現的方式?

Haser0305 commented 1 year ago

好的,以下是我如何重現這問題

  1. 創建一個 單一 partition 的 topic 裡面先寫入 1000 筆資料
  2. 使用 #1709 的 branch 在裡面插入一些顯示字串方便觀察,並拔掉確認此 topic partition 有無 seek 過的部分以重現這現象,隨後透過 shadowjar 建立 jar file
  3. 透過上一步新版 connector,新建一個 exporter 並且設定這個 topic partition 的 from 為 1200

以上是我這次測試的步驟,在第二點的字串中可以觀察到每次 put 所得到的 records 都是 offset 從 0 到 194。 之後會一直重複seek,並且消費 0 -> 194 這些資料

chia7712 commented 1 year ago

@Haser0305 please take a look at #1804