aiven / kafka

Mirror of Apache Kafka
Apache License 2.0
2 stars 1 forks source link

Race condition between ReplicaFetcherThread and RemotePartitionMetadataStore. #33

Closed HenryCaiHaiying closed 1 month ago

HenryCaiHaiying commented 1 year ago

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread) org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0

HenryCaiHaiying commented 1 year ago

https://the-asf.slack.com/archives/C05A1NF5SFM/p1687845983795619

HenryCaiHaiying commented 1 year ago

Stack trace of the error:

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0
    at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:152)
    at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.remoteLogSegmentMetadata(RemotePartitionMetadataStore.java:164)
    at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:211)
    at kafka.log.remote.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.scala:790)
    at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$2(ReplicaFetcherThread.scala:192)
    at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$2$adapted(ReplicaFetcherThread.scala:188)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$1(ReplicaFetcherThread.scala:188)
    at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$1$adapted(ReplicaFetcherThread.scala:186)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.ReplicaFetcherThread.buildRemoteLogAuxState(ReplicaFetcherThread.scala:186)
    at kafka.server.AbstractFetcherThread.$anonfun$fetchOffsetAndBuildRemoteLogAuxState$2(AbstractFetcherThread.scala:734)
    at kafka.server.AbstractFetcherThread.fetchOffsetAndApplyFun(AbstractFetcherThread.scala:707)
    at kafka.server.AbstractFetcherThread.fetchOffsetAndBuildRemoteLogAuxState(AbstractFetcherThread.scala:733)
    at kafka.server.AbstractFetcherThread.handleOffsetMovedToTieredStorage(AbstractFetcherThread.scala:748)
    at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:393)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:329)
    at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:328)
    at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
    at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
    at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
    at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:328)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:127)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:127)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
HenryCaiHaiying commented 1 year ago
[2023-06-26 22:11:04,779] INFO Initializing the resources. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,935] INFO Received leadership notifications with leader partitions [] and follower partitions [bgi8NmhaSHKQnf0nuK7Eaw:topic3-0, IWXjwq-vSsS8cu9DkGj6EQ:topic4-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,978] INFO Initialized resources successfully. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,979] INFO Assign partitions: [bgi8NmhaSHKQnf0nuK7Eaw:topic3-0, IWXjwq-vSsS8cu9DkGj6EQ:topic4-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0

[2023-06-26 22:11:05,025] INFO Received leadership notifications with leader partitions [27Fe4HEbQ4KUKGO6wupDqg:topic2-0] and follower partitions [] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:05,025] INFO Assign partitions: [27Fe4HEbQ4KUKGO6wupDqg:topic2-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
mdedetrich commented 1 year ago

Thanks for reporting this. I am back from holidays so I am going to start looking into this.

jeqo commented 1 year ago

Testing with upstream + retention PR, I can reproduce a similar issue:

[2023-07-10 12:32:37,873] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for t3-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found
        at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:252)

and second broker is not able to replicate.

Steps to reproduce:

Using:

Create topic with RF=2

make rf=2 topic=t3 create_topic_ts_by_size
docker exec -e KAFKA_OPTS= kafka-ts \
        kafka-topics \
        --bootstrap-server kafka:29092 \
        --create \
        --config remote.storage.enable=true \
        --config segment.bytes=1000000  \
        --config retention.bytes=20000000  \
        --config local.retention.bytes=5000000  \
        --partitions 1 \
        --replication-factor 2 \
        --topic t3
Created topic t3.

Stop second broker:

dc stop kafka1
[+] Stopping 1/1
 ✔ Container demo-kafka1-1  Stopped                                                                           0.9s

Write to topic:

make topic=t3 fill_topic
docker exec -e KAFKA_OPTS= kafka-ts \
        kafka-producer-perf-test --producer-props bootstrap.servers=kafka:29092 \
        --topic t3 \
        --num-records 60000 \
        --record-size 1024 \
        --throughput 1000
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 1.1 ms avg latency, 216.0 ms max latency.
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 0.4 ms avg latency, 20.0 ms max latency.
5000 records sent, 1000.0 records/sec (0.98 MB/sec), 0.9 ms avg latency, 67.0 ms max latency.
5001 records sent, 1000.0 records/sec (0.98 MB/sec), 0.5 ms avg latency, 21.0 ms max latency.
5001 records sent, 1000.0 records/sec (0.98 MB/sec), 1.1 ms avg latency, 80.0 ms max latency.
5000 records sent, 1000.0 records/sec (0.98 MB/sec), 0.4 ms avg latency, 18.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 0.6 ms avg latency, 39.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 0.4 ms avg latency, 18.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 2.0 ms avg latency, 125.0 ms max latency.
5002 records sent, 1000.0 records/sec (0.98 MB/sec), 0.4 ms avg latency, 19.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.2 ms avg latency, 87.0 ms max latency.
60000 records sent, 999.900010 records/sec (0.98 MB/sec), 0.77 ms avg latency, 216.00 ms max latency, 0 ms 50th, 1 ms 95th, 16 ms 99th, 79 ms 99.9th.

Check directories:

Broker 0:

make topic=t3 kafka_container=kafka-ts show_local_data | grep '.log$'
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:22 00000000000000054522.log
-rw-r--r--. 1 appuser appuser 975K Jul 10 12:22 00000000000000055464.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000056403.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000057343.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000058283.log
-rw-r--r--. 1 appuser appuser 807K Jul 10 12:23 00000000000000059223.log

Broker 1:

make topic=t3 kafka_container=demo-kafka1-1 show_local_data | grep '.log$'
Error response from daemon: Container 00cbed84d7a8a46e0f6610e76c5d7dedd4b57a3016549aec6a73d1bdc8e15452 is not running

Then, start Broker 1, observe logs and directory:

[2023-07-10 12:32:37,873] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for t3-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found
        at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:252)
        at kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:102)
        at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:761)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:412)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
        at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
        at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
        at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
        at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
        at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
jeqo commented 1 year ago

Update: there seem to be a misconfiguration on my environment (upstream) that lead to this issue.

User topic was configured with RF of 2, but internal topics were not, TBRLMM was complaining:

kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-19 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-15 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-29 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-9 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-45 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,887] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-35 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-17 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-31 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-13 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-25 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-47 from OfflinePartition to OnlinePartition (state.change.logger)

By applying this fix (https://github.com/aiven/tiered-storage-for-apache-kafka/commit/2a36ac086c88effc5d673e5b1870c19f076a0fd5) I haven't been able to reproduce this issue, and replication started to work as expected:

Screencast from 2023-07-11 11-42-48.webm

There's still more scenarios to test about replication in general, but wanted to note here that at least this scenario is not reproducible with upstream. @HenryCaiHaiying could you confirm if the internal topics (__remote_log_metadata specifically) have the proper replication factor on your environment?

jeqo commented 1 year ago

Please, ignore my latest comments. The race condition seems to be possible in upstream as well -- following up in this issue: https://issues.apache.org/jira/browse/KAFKA-15181

mdedetrich commented 1 year ago

Upstream PR at https://github.com/apache/kafka/pull/14012

HenryCaiHaiying commented 1 year ago

I was able to reproduce the similar ERROR:

org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found

Under this test setup: Kafka node-3 died, launched a replacement broker: node-4 and reassign the partitions from node-3 to node-4. Node-4 broker starts up prints out the above ERROR repeatedly. In my test up, the topic has a shorter retention.ms with only a few hours (for both local and remote), because there was no new incoming data to the topic, all remote segments were deleted. When PrimaryConsumerTask is handling DELETE_SEGMENT_FINISHED metadata event, it removes idToSegmentMetadata for the given segment in RemoteLogMetadataCache. This caused the lookup failed in the following line in TopicBasedRemoteLogMetadataManager:

  override protected def buildRemoteLogAuxState(partition: TopicPartition,
                                                currentLeaderEpoch: Int,
                                                leaderLocalLogStartOffset: Long,
                                                leaderLogStartOffset: Long): Unit = {
    replicaMgr.localLog(partition).foreach { log =>
      if (log.remoteLogEnabled()) {
        replicaMgr.remoteLogManager.foreach { rlm =>
          var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty()
          val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset))
          if (epoch.isDefined) {
            rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset)

Not sure about the best way to fix this, maybe keeping remote log segment retention really long? Or always keep the last segment in RemoteLogMetadataCache?

mdedetrich commented 1 year ago

New upstream bug is at https://github.com/apache/kafka/pull/14127