Closed HenryCaiHaiying closed 2 months ago
Probably a race condition, this is the log related to TopicBasedRemoteLogMetadataManager during that time:
kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:05,577] INFO Initializing the resources. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:06,687] INFO Topic [__remote_log_metadata] already exists (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:06,758] INFO Sleep for : 5000 ms before it is retried again. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:13,343] INFO Initialized resources successfully. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
It's hitting the following line, there was an exception during initialization, not printed out:
// If topic is already created, validate the existing topic partitions.
try {
String topicName = remoteLogMetadataTopicRequest.name();
// If the existing topic partition size is not same as configured, mark initialization as failed and exit.
if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) {
initializationFailed = true;
}
} catch (Exception e) {
log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue;
}
This error is probably not fatal, it seems RLM can recover.
Yes, it works despite the error. But of course still needs fixing.
Add a log statement and this is underlying exception when first initialize failed:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.isPartitionsCountSameAsConfigured(TopicBasedRemoteLogMetadataManager.java:450)
at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.initializeResources(TopicBasedRemoteLogMetadataManager.java:397)
at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$configure$1(TopicBasedRemoteLogMetadataManager.java:356)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
The topic is __remote_log_metadata, should already be created. AdminClient.describeTopic failed on first try
The AdminClient is connecting to localhost:9092, Kafka server might not be ready yet to answer describe call
Add this property helps shorten the waiting cycle:
rlmm.config.remote.log.metadata.initialization.retry.interval.ms=500
Saw the following exception when Kafka broker is restarted, looks like TopicBasedRemoteLogMetadataManager is not initialized on time:
kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:07,283] INFO Created a new task: class kafka.log.remote.RemoteLogManager$RLMTask[HnwaKTgCRz2dp901XqiO9g:topic1-0] and getting scheduled (kafka.log.remote.RemoteLogManager) kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:07,286] INFO Scheduling runnable class kafka.log.remote.RemoteLogManager$RLMTask[HnwaKTgCRz2dp901XqiO9g:topic1-0] with initial delay: 0, fixed delay: 5000 (kafka.log.remote.RLMScheduledThreadPool) kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:07,291] INFO [RemoteLogManager=0 partition=HnwaKTgCRz2dp901XqiO9g:topic1-0] Find the highest remote offset for partition: HnwaKTgCRz2dp901XqiO9g:topic1-0 after becoming leader, leaderEpoch: 0 (kafka.log.remote.RemoteLogManager$RLMTask) kafka-tiered-storage-demo-kafka-1 | [2023-06-08 18:55:07,310] ERROR [RemoteLogManager=0 partition=HnwaKTgCRz2dp901XqiO9g:topic1-0] Error occurred while copying log segments of partition: HnwaKTgCRz2dp901XqiO9g:topic1-0 (kafka.log.remote.RemoteLogManager$RLMTask) kafka-tiered-storage-demo-kafka-1 | java.lang.IllegalStateException: This instance is in invalid state, initialized: false close: false kafka-tiered-storage-demo-kafka-1 | at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:501) kafka-tiered-storage-demo-kafka-1 | at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.highestOffsetForEpoch(TopicBasedRemoteLogMetadataManager.java:224) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager.$anonfun$findHighestRemoteOffset$2(RemoteLogManager.scala:612) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager.$anonfun$findHighestRemoteOffset$2$adapted(RemoteLogManager.scala:609) kafka-tiered-storage-demo-kafka-1 | at scala.Option.foreach(Option.scala:437) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager.$anonfun$findHighestRemoteOffset$1(RemoteLogManager.scala:609) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager.$anonfun$findHighestRemoteOffset$1$adapted(RemoteLogManager.scala:608) kafka-tiered-storage-demo-kafka-1 | at scala.Option.foreach(Option.scala:437) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager.findHighestRemoteOffset(RemoteLogManager.scala:608) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager$RLMTask.maybeUpdateReadOffset$1(RemoteLogManager.scala:394) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.scala:399) kafka-tiered-storage-demo-kafka-1 | at kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.scala:576)