Aiven-Open / tiered-storage-for-apache-kafka

RemoteStorageManager for Apache Kafka® Tiered Storage
Apache License 2.0
91 stars 19 forks source link

fix: handle race condition on cache retention #569

Open jeqo opened 2 months ago

jeqo commented 2 months ago

Cache removal listener-related tests (DiskChunkCacheMetricsTest and MemorySegmentIndexesCacheTest) are flaky. Recent evidence:

To reproduce this locally, @RepeatedTest(10000) has been used.

The failure is caused by the timeout condition when waiting for a cache entry to be removed:

DiskChunkCacheMetricsTest > metrics() > repetition 279 of 1000 FAILED
    org.awaitility.core.ConditionTimeoutException: Condition with alias 'Deletion happens' didn't complete within 30 seconds because condition with lambda expression in io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCacheMetricsTest that uses javax.management.ObjectName was not fulfilled.
        at app//org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
        at app//org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
        at app//org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
        at app//org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1006)
        at app//org.awaitility.core.ConditionFactory.until(ConditionFactory.java:975)
        at app//io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCacheMetricsTest.metrics(DiskChunkCacheMetricsTest.java:125)

Waiting for RemovalListener to be called just after inserting a couple of entries seem to not been deterministic, and retention.ms time boundary is needed to get the removal called within the time-frame of the test (default retention.ms = 10min).

As a separate finding, while running this locally, I spot the exception of file not found after some thousand runs:

[2024-07-05 12:26:01,100] INFO DiskChunkCacheConfig values: 
    path = /var/folders/f_/6tkk7f6x7377dzmwfsqkdtk00000gq/T/junit16372691694497514473
    prefetch.max.size = 0
    retention.ms = 600000
    size = 1024
 (io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCacheConfig:370)
[2024-07-05 12:43:47,886] ERROR Failed to delete cached file for key ChunkKey(segmentFileName=segment, chunkId=1) with path /var/folders/f_/6tkk7f6x7377dzmwfsqkdtk00000gq/T/junit11001183003053723613/cache/segment-1 from cache directory. The reason of the deletion is EXPIRED (io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache:111)
java.nio.file.NoSuchFileException: /var/folders/f_/6tkk7f6x7377dzmwfsqkdtk00000gq/T/junit11001183003053723613/cache/segment-1
    at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
    at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
    at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
    at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
    at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:148)
    at java.base/java.nio.file.Files.readAttributes(Files.java:1851)
    at java.base/java.nio.file.Files.size(Files.java:2468)
    at io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache.lambda$removalListener$0(DiskChunkCache.java:101)
    at com.github.benmanes.caffeine.cache.Async$AsyncEvictionListener.onRemoval(Async.java:117)
    at com.github.benmanes.caffeine.cache.Async$AsyncEvictionListener.onRemoval(Async.java:101)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.notifyEviction(BoundedLocalCache.java:442)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$evictEntry$2(BoundedLocalCache.java:1071)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1828)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.evictEntry(BoundedLocalCache.java:1032)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:939)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:925)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.expireEntries(BoundedLocalCache.java:903)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.maintenance(BoundedLocalCache.java:1721)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.performCleanUp(BoundedLocalCache.java:1660)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache$PerformCleanupTask.run(BoundedLocalCache.java:3886)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

There seem to be multiple calls to this listener happening concurrently, causing this behavior (first caller to win, and the next one to don't find the file), so an additional handling is has been added. At runtime this exception is swallow by the listener execution, so this is mostly to have better logging when this happens.

This seems to be expected looking at the Caffeine docs:

The RemovalListener states:

An instance may be called concurrently by multiple threads to process different entries.

Also

Implementations of this interface should avoid performing blocking calls or synchronizing on shared resources.

AnatolyPopov commented 2 months ago

Sorry, I'm failing to understand what kind of race condition you are talking about? Could you please clarify?

jeqo commented 2 months ago

@AnatolyPopov ofc, sorry it wasn't explained properly. I have added more details on the description. This PR at least try to fix one of the known (now) causes for flaky failing tests:

image
AnatolyPopov commented 2 months ago

I wonder why at all this can happen. This basically means that the listener is running for a specific (key, value) pair multiple times if I understand correctly. Or is it tests only thing and the test itself cleans the file?

jeqo commented 2 months ago

@AnatolyPopov I have refactored the test to have a time-based eviction and have more consistent results (before it tested if either value 1 or 2 were deleted, not it tests if 1 or 2 or both are deleted).

I have separated the exception handling for missed file, as it's nice to have but it doesn't fixes the flakiness completely. The refactoring of the test is what is trying to fix the flakiness. These are two separated commits now. PTAL

jeqo commented 1 month ago

Finally, some additional evidence that this test is flaky:

jeqo commented 1 month ago

Also, the same test but for the memory based cache is failing on main: https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/actions/runs/10283516010/job/28457507975

Managed to reproduce locally with @RepeatedTest(1000):

[2024-08-08 20:20:35,517] INFO CacheConfig values: 
    retention.ms = -1
    size = 18
 (io.aiven.kafka.tieredstorage.config.CacheConfig:370)

Condition with Lambda expression in io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCacheTest$CacheTests was not fulfilled within 30 seconds.
org.awaitility.core.ConditionTimeoutException: Condition with Lambda expression in io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCacheTest$CacheTests was not fulfilled within 30 seconds.
    at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1006)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:975)
    at io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCacheTest$CacheTests.sizeBasedEviction(MemorySegmentIndexesCacheTest.java:262)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
    at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
    at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Where condition is:

            await()
                .atMost(Duration.ofSeconds(30))
                .pollDelay(Duration.ofSeconds(2))
                .pollInterval(Duration.ofMillis(10))
                .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty());