Aiven-Open / tiered-storage-for-apache-kafka

RemoteStorageManager for Apache Kafka® Tiered Storage
Apache License 2.0
91 stars 19 forks source link

Retrieve message has error in logs #470

Open bingkunyangvungle opened 9 months ago

bingkunyangvungle commented 9 months ago

What happened?

After I try to retrieve the message from client(maven kafka-clients with version 3.4.0), it can get the messages, but on the server side, it generate the following error:

[2023-12-14 14:23:49,286] ERROR Error occurred while reading the remote data for user_s3-0 (kafka.log.remote.RemoteLogReader) org.apache.kafka.common.KafkaException: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:379) at org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:342) at org.apache.kafka.storage.internals.log.RemoteIndexCache.createCacheEntry(RemoteIndexCache.java:375) at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$getIndexEntry$6(RemoteIndexCache.java:365) at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916) at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.kafka.storage.internals.log.RemoteIndexCache.getIndexEntry(RemoteIndexCache.java:364) at org.apache.kafka.storage.internals.log.RemoteIndexCache.lookupOffset(RemoteIndexCache.java:436) at kafka.log.remote.RemoteLogManager.lookupPositionForOffset(RemoteLogManager.java:1326) at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1272) at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:62) at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:31) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1623) Caused by: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:532) at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.lambda$fetchIndex$5(ClassLoaderAwareRemoteStorageManager.java:89) at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.withClassLoader(ClassLoaderAwareRemoteStorageManager.java:66) at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.fetchIndex(ClassLoaderAwareRemoteStorageManager.java:89) at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$7(RemoteIndexCache.java:377) ... 19 more Caused by: java.lang.RuntimeException: java.lang.InterruptedException at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:87) at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchSegmentManifest(RemoteStorageManager.java:552) at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:507) ... 23 more Caused by: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.get(SegmentManifestProvider.java:69) ... 25 more

What did you expect to happen?

The server side should have no error in logs.

What else do we need to know?

When I used the released package, it not only generated error, but the client couldn't get the messages as well. I tried with self-built packages with this commit e06cae8 in main branch, it also generated error, but the client can get the messages.

ivanyu commented 9 months ago

Hi @bingkunyangvungle Thanks for reporting this. This is a know issue originating from how the broker side handles things. And we're working on a workaround on the plugin level and also on fixing it on the broker side. For now, please try increasing fetch.max.wait.ms (e.g. 2 seconds).

bingkunyangvungle commented 9 months ago

Thank you @ivanyu for the follow-up. I'll try modifying the configuration for now.

jeqo commented 8 months ago

I have created https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/483 to track the resolution of this issue on the TS framework.

@bingkunyangvungle we have recently added an async cache #472 for indexes that has solved the last scenario we've identified that this exception could lead to blocked consumers. Even though will still be thrown, async caches should allow the consumer to eventually read values locally and progress.

Let us know if @ivanyu suggestion was enough for your environment.