ben-manes / caffeine

A high performance caching library for Java
Apache License 2.0
15.83k stars 1.59k forks source link

Deadlock after upgrading to 3.0.1 #537

Closed perlambaek closed 3 years ago

perlambaek commented 3 years ago

After upgrading to 3.0.1 we are getting deadlocks across many of our caches after heavy traffic. We have a ton of locked threads, but common for most of them is that they are stuck in compute or computeIfAbsent.

Here are a few stacktraces from some of the locked threads, I am not sure if this is enough to debug the matter. We have been using caffeine for several years, and the regression seems to be the upgrade to 3.0.1.

java.util.concurrent.ConcurrentHashMap.compute(java.lang.Object, java.util.function.BiFunction) (line: 1931)
com.github.benmanes.caffeine.cache.BoundedLocalCache.remap(java.lang.Object, java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, long[ ], boolean) (line: 2592)
com.github.benmanes.caffeine.cache.BoundedLocalCache.compute(java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, boolean, boolean, boolean) (line: 2542)
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$refreshIfNeeded$5(long[ ], java.lang.Object, java.util.concurrent.CompletableFuture[ ], java.lang.Object, java.lang.Object, com.github.benmanes.caffeine.cache.Node, long, java.lang.Object, java.lang.Throwable) (line: 1286)
com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$343+0x000000080147be38.accept(java.lang.Object, java.lang.Object)
java.util.concurrent.CompletableFuture.uniWhenComplete(java.lang.Object, java.util.function.BiConsumer, java.util.concurrent.CompletableFuture$UniWhenComplete) (line: 859)
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(int) (line: 837)
java.util.concurrent.CompletableFuture.postComplete() (line: 506)
java.util.concurrent.CompletableFuture$AsyncSupply.run() (line: 1769)
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) (line: 1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run() (line: 630)
java.lang.Thread.run() (line: 832)
java.util.concurrent.ConcurrentHashMap.replaceNode(java.lang.Object, java.lang.Object, java.lang.Object) (line: 1122)
java.util.concurrent.ConcurrentHashMap.remove(java.lang.Object) (line: 1102)
com.github.benmanes.caffeine.cache.BoundedLocalCache.discardRefresh(java.lang.Object) (line: 323)
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$remap$15(boolean, java.lang.Object[ ], java.util.function.BiFunction, java.lang.Object, long[ ], int[ ], java.lang.Object, com.github.benmanes.caffeine.cache.Expiry, java.lang.Object[ ], java.lang.Object[ ], com.github.benmanes.caffeine.cache.RemovalCause[ ], com.github.benmanes.caffeine.cache.Node[ ], java.lang.Object, com.github.benmanes.caffeine.cache.Node) (line: 2653)
com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$347+0x000000080147f408.apply(java.lang.Object, java.lang.Object)
java.util.concurrent.ConcurrentHashMap.compute(java.lang.Object, java.util.function.BiFunction) (line: 1940)
com.github.benmanes.caffeine.cache.BoundedLocalCache.remap(java.lang.Object, java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, long[ ], boolean) (line: 2592)
com.github.benmanes.caffeine.cache.BoundedLocalCache.compute(java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, boolean, boolean, boolean) (line: 2542)
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$refreshIfNeeded$5(long[ ], java.lang.Object, java.util.concurrent.CompletableFuture[ ], java.lang.Object, java.lang.Object, com.github.benmanes.caffeine.cache.Node, long, java.lang.Object, java.lang.Throwable) (line: 1286)
com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$343+0x000000080147be38.accept(java.lang.Object, java.lang.Object)
java.util.concurrent.CompletableFuture.uniWhenComplete(java.lang.Object, java.util.function.BiConsumer, java.util.concurrent.CompletableFuture$UniWhenComplete) (line: 859)
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(int) (line: 837)
java.util.concurrent.CompletableFuture.postComplete() (line: 506)
java.util.concurrent.CompletableFuture.complete(java.lang.Object) (line: 2137)
org.xyz.BulkReloadingAsyncCacheLoader.processAndCompleteFutures() (line: 50)
org.xyz.BulkReloadingAsyncCacheLoader.access$processAndCompleteFutures(org.xyz.BulkReloadingAsyncCacheLoader) (line: 19)
org.xyz.BulkReloadingAsyncCacheLoader$asyncReload$1.run() (line: 78)
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) (line: 1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run() (line: 630)
java.lang.Thread.run() (line: 832)
ben-manes commented 3 years ago

can you downgrade for the time being? It looks like the rewrite of refresh broke for you somehow. If you can reproduce that would help.

ben-manes commented 3 years ago

I will try to look into this during the evening, PST. If you can provide more context to reproduce from, that would be very beneficial. If you can downgrade to 2.9 to unblock you, that would be a good first step.

There must be an unexpected call to remove during an explicit LoadingCache.refresh call to compute. That was intended to be independent, and skimming the code the scenario doesn't jump out at me. If we can reproduce then I can try to fix and release a patch for you asap.

ben-manes commented 3 years ago

Sorry, I noticed this is due to refreshAfterWrite, as your stack trace shows refreshIfNeeded in the callback. I will try to debug but a reproducer is needed for me to untangle this.

ben-manes commented 3 years ago

Can you provide your BulkReloadingAsyncCacheLoader? When reviewing the cache's code, I can't determine a direct cause under normal behavior. I am wondering if this class is interacting in a way that causes the deadlock.

spand commented 3 years ago

Thank you for your quick response.

I have attached the code below (it is in Kotlin). Basic idea of this is that we only allow one simultaneous bulk reload at a time. If no bulk reload is in progress it proceeds. Any further reloads will queue up and be handled when the first is done. I kind of ruled out our bulk reloader as the culprit since it is closer to the root of the thread than your caffeine code. Maybe what is different in our case is that we complete many futures all at once (though from the same thread).

package org.xyz

import com.github.benmanes.caffeine.cache.AsyncCacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import java.util.concurrent.BlockingDeque
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier

private class KeyedCompletableFuture<I, E>(val key: I) : CompletableFuture<E>()

fun <I, E> Caffeine<Any, Any>.buildBulkReload(bulkLoader: (Iterable<I>) -> Map<I, E>): LoadingCache<I, E> {
    return buildAsync(BulkReloadingAsyncCacheLoader(bulkLoader)).synchronous()
}

private class BulkReloadingAsyncCacheLoader<I, E>(private val bulkLoader: (Iterable<I>) -> Map<I, E>) : AsyncCacheLoader<I, E> {

    private val NO_RELOADER = false
    private val RELOAD_IN_PROGRESS = true
    private val bulkSize = 5000
    private val queue: BlockingDeque<KeyedCompletableFuture<I, E>> = LinkedBlockingDeque(bulkSize)

    private val isLoadInProgress = AtomicBoolean(NO_RELOADER) // Cross thread lock

    override fun asyncLoad(key: I, executor: Executor): CompletableFuture<E?> {
        return CompletableFuture.supplyAsync(Supplier {
            bulkLoader(listOf(key))[key]
        }, executor)
    }

    override fun asyncLoadAll(keys: MutableIterable<I>, executor: Executor): CompletableFuture<Map<I, E>> {
        return CompletableFuture.supplyAsync(Supplier {
            bulkLoader(keys)
        }, executor)
    }

    private fun processAndCompleteFutures() {
        val drainBuffer = ArrayList<KeyedCompletableFuture<I, E>>(bulkSize)

        queue.drainTo(drainBuffer, bulkSize)

//            println("Reload  ${drainBuffer.size}")
        if (drainBuffer.isNotEmpty()) {
            try {
                val newValues = bulkLoader(drainBuffer.map { it.key })
                drainBuffer.forEach {
                    it.complete(newValues[it.key])
                }
            } catch (e: Throwable) {
                drainBuffer.forEach {
                    it.completeExceptionally(e)
                }
            }
        }
    }

    /*
    Invariant:
    queue.isNotEmpty() => one thread must be in a processing loop.

    Otherwise the value will be stuck waiting for something else to start a other reload thread.
     */
    override fun asyncReload(key: I, oldValue: E, executor: Executor): CompletableFuture<E> {
        val completableFuture = KeyedCompletableFuture<I, E>(key)
        queue.put(completableFuture)

        val iAmUpdater = isLoadInProgress.compareAndSet(NO_RELOADER, RELOAD_IN_PROGRESS)
        // Only one is allowed to reload
        if (iAmUpdater) {
            // Start a thread to process the queue
            executor.execute {
                try {
                    // We continue processing while the queue is not empty
                    while (queue.isNotEmpty()) {
                        processAndCompleteFutures()
                    }
                } finally {
                    // Ok. We are done.
                    isLoadInProgress.set(false)
                }
                // There is a race condition here: between queue.isNotEmpty and isLoadInProgress.set(false) the queue
                // could have been updated. We only exit if we see an empty queue and thus we can keep the invariant.
                while (queue.isNotEmpty()) {
                    val stillUpdater = isLoadInProgress.compareAndSet(NO_RELOADER, RELOAD_IN_PROGRESS)
                    if (stillUpdater) {
                        try {
                            while (queue.isNotEmpty()) {
                                processAndCompleteFutures()
                            }
                        } finally {
                            isLoadInProgress.set(false)
                        }
                    }
                }
            }
        }

        return completableFuture
    }
}

We also use a non standard thread pool but only for giving it a custom name:

private class CachepoolThreadFactory() : ThreadFactory {
    private companion object {
        private val poolNumber = AtomicInteger(1)
    }

    private val threadNumber = AtomicInteger(1)
    private val namePrefix = "cachepool-" + poolNumber.getAndIncrement() + "-thread-"

    private val defaultThreadFactory: ThreadFactory = Executors.defaultThreadFactory()

    override fun newThread(r: Runnable): Thread {
        return defaultThreadFactory.newThread(r).apply {
            name = namePrefix + threadNumber.getAndIncrement()
        }
    }
}
ben-manes commented 3 years ago

Thanks. From the limited information I can’t determine a cause yet. Can you provide more stacktraces, privately if needed?

My guess would be that during the asyncReload the thread is completing other reload futures, e.g. via a TPE’s caller runs policy. This causes a deadlock because the refresh is within a compute() and the future’s whenComplete() handler does a remove(). As writes within a computation deadlocks ConurrentHashMap, this causes your failures.

In v3 the futures are maintained in a map, which allows for more advanced and robust behavior. In v2 it was fire-and-forget with hacking the write timestamp to avoid duplicates. If your async reload writes back on the calling thread then v3 isn’t compatible with that.

spand commented 3 years ago

I will get my colleague to provide more.

If you can help me unpack your last paragraph.

I read it as: we must not call future.complete() synchronously from within the AsyncCacheLoader.asyncReload(). If that is the case then I cannot see how that should take place as all work is done in a Runnable given to executor.execute(). I believe that is the case shown in the second stacktrace provided above.

ben-manes commented 3 years ago

You read that correctly. The given stacktrace doesn’t indicate that, so I am guessing it might happen if you have your executor using the caller runs policy or equivalent.

Stephan202 commented 3 years ago

@ben-manes if caller-runs kicks in, wouldn't the second stack trace then be longer, showing the code that schedules the task (in this case the executor.execute invocation?) :thinking:

Reason I'm interested in this issue is that we might have seen the same problem a few weeks back, also with a bulk refresh cache loader implementation. (If relevant I can provide further details, though the associated code has been modified since then and is spread across different places. Plus, it cannot be shared publicly here. We could set up a brief call if you're interested; I can collect the code as it was back then and walk you through it.)

ben-manes commented 3 years ago

I am free for a call in the evenings PST or weekends. You can also send me emails for private correspondence.

I agree the stacktrace above don't show caller runs, but it may be in another trace not shown. Given the limited information, that's my best guess trying to walk through the code. Of course it could be a wrong guess.

Stephan202 commented 3 years ago

Alright, I just sent you an email with minimal context. Hopefully it's helpful; otherwise I can provide further details tomorrow. (I'm in CEST, so going to bed now :).)

spand commented 3 years ago

I have verified that the executor in use (also shown above) uses AbortPolicy.

We had a look at the thread dump again. Most (~100) threads are locked in ConcurrentHashMap.compute().. presumably waiting for a lock (the first stacktrace). A few ~5 have identical stacktraces matching the second stacktrace posted.

Looking at second stacktrace again I notice that they all seem to have acquired the "compute" lock as they have proceeded deeper in the call stack. What strikes me as extremely suspicious is:

--------->  java.util.concurrent.ConcurrentHashMap.replaceNode(java.lang.Object, java.lang.Object, java.lang.Object) (line: 1122)
        java.util.concurrent.ConcurrentHashMap.remove(java.lang.Object) (line: 1102)
        com.github.benmanes.caffeine.cache.BoundedLocalCache.discardRefresh(java.lang.Object) (line: 323)
        com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$remap$15(boolean, java.lang.Object[ ], java.util.function.BiFunction, java.lang.Object, long[ ], int[ ], java.lang.Object, com.github.benmanes.caffeine.cache.Expiry, java.lang.Object[ ], java.lang.Object[ ], com.github.benmanes.caffeine.cache.RemovalCause[ ], com.github.benmanes.caffeine.cache.Node[ ], java.lang.Object, com.github.benmanes.caffeine.cache.Node) (line: 2653)
        com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$347+0x000000080147f408.apply(java.lang.Object, java.lang.Object)
--------->  java.util.concurrent.ConcurrentHashMap.compute(java.lang.Object, java.util.function.BiFunction) (line: 1940)
        com.github.benmanes.caffeine.cache.BoundedLocalCache.remap(java.lang.Object, java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, long[ ], boolean) (line: 2592)
        com.github.benmanes.caffeine.cache.BoundedLocalCache.compute(java.lang.Object, java.util.function.BiFunction, com.github.benmanes.caffeine.cache.Expiry, boolean, boolean, boolean) (line: 2542)
....

Both marked methods take a lock on a node using synchronized. Can you elaborate on why it is safe to access the ConcurrentHashMap from within ConcurrentHashMap.compute() ? It seems pretty unsafe if one thread takes a lock on node A, calls remove which attempts to lock node B and with a different thread doing the same but for node B, A respectively.

It seems pretty similar to what you warn about in CacheLoader:

Warning: loading must not attempt to update any mappings of this cache directly.
ben-manes commented 3 years ago

This is a bit confusing because those are two different hash map instances. The cache entries are held in data and the in-flight refresh futures are held in refreshes. They should be orchestrated so that recursive calls do not occur. The concern is if they somehow did, e.g. why I asked about the executor's rejection policy which could result in a recursive write.

A simplified version of refresh occurs as,

boolean added = false;
var refreshFuture = refreshes.computeIfAbsent(keyReference, k -> {
  added = true;
  return cacheLoader.asyncReload(key, oldValue, executor);
});
if (added) {
  refreshFuture.whenComplete(newValue -> {
    data.compute(key, (k, currentValue) -> {
      boolean removed = refreshes.remove(key, refreshFuture);
      return (currentValue != null) && removed ? newValue : currentValue;
    }
  });
}

As shown, the data.compute does a refreshes.remove and refreshes.compute does not write into data. However, if the asyncReload called future.complete(newValue) then this would cause a recursive write as the whenComplete is called within the refreshes.computeIfAbsent and calls refreshes.remove.

When a new write occurs, e.g. cache.put or an expiration with cache.get, then the data.compute calls refresh.remove to discard any prior in-flight future. This avoids ABA style problems where the old refresh might complete and update to a stale value, so the discardRefresh is called for a linearizable flow.

ben-manes commented 3 years ago

So far I can only think of one scenario which might show a recursive compute, however I can also disprove it. I am mentioning it in case my understanding is incorrect or it sparks ideas for other scenarios.

https://github.com/ben-manes/caffeine/blob/ba785593a091070000e0aa611f9d9802e991ffd6/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java#L1242-L1256

The Async.isReady checks if the future has completed successfully, e.g. by calling future.isDone(). The oldValue is then extracted via future.join(). If the future is incomplete then waitingGet is called which calls postComplete to run the whenComplete handlers. Those handlers update the mapping to assign the entry's weight and expiration. In doing so, the discardRefresh would be called.

This scenario doesn't make sense for a few reasons. Since the future is incomplete, it is not eligible for refresh. I am unsure if it will enter pass the guard conditions if the future's compute time exceeds the refresh interval. If so, the isDone check should indicate that the future is incomplete and a no-op would occur. The future.join() would never be run in a case where postComplete is invoked.

ben-manes commented 3 years ago

@perlambaek @spand can you please email me the full stack trace. If you can reproduce, that would be amazing.

@Stephan202's problem appeared to be due to the application code blocking forever in CacheLoader.load due to an http client never returning. This kept the cache's entry locked which when it became eligible for eviction that was blocked, and further pile ups ensued. I believe a network timeout to recover from unresponsiveness would resolve that issue.

I have been unable to eyeball a problem and don't have enough to debug this issue. Anything you can provide, public or private, would be helpful.

Stephan202 commented 3 years ago

Thanks a lot @ben-manes for helping out. Besides identifying the root cause you also provided excellent advice on how we could better integrate Caffeine, by moving some blocking logic such that other callers aren't impacted.

(We learned a lot about Caffeine in the process of building our own "bulk refresh cache loader". Once the code has run stably in production for a few months we should perhaps consider open-sourcing our approach. TBD.)

ben-manes commented 3 years ago

@Stephan202 If helpful, the examples directory is an unsupported, "use at your own risk", contribution section. If you wanted to open-source but not support, I'd be happy to accept a PR for the code dump.

Stephan202 commented 3 years ago

Good idea; will definitely consider that option :+1:

spand commented 3 years ago

@ben-manes We dont think a full thread dump will provide more as its quite noisy. Going forward we will instead try to make a small reproducer or see if we can spot any incorrect usages of Caffeine.

Thanks again for your time here.

ben-manes commented 3 years ago

Sometimes the full set of stack traces can be useful. @Stephan202 provided his and that allowed me to narrow down the cause of his problem. Otherwise any other insights into what you are seeing, your use-case, and related code would be helpful.

I am closing this for now because there is nothing else that I can do. Let's reopen when you can provide more details to help us debug the issue. I am sorry that you are experiencing these problems and it will be a high priority for me if we can gain more insight into the root cause.

ben-manes commented 3 years ago

When reviewing the BulkReloadingAsyncCacheLoader code, there could be a problem if the blocking queue is full and (for some reason) no draining thread is in-flight. In that case then no refresh caller could make progress as they wait on queue.put. As this blocked forever, but performed under a map computation, other cache calls might try to invalidate the refresh (e.g. an eviction) and be blocked waiting for the map compute to complete. This would cause a deadlock as observed in your original stack trace snippets. The 2.x release would tolerate this better, because it does not maintain of map of in-flight refresh operations, thereby lacking linearizability (ABA problems).

Consider instead using queue.offer, perhaps with a retry loop over the draining to assist when it fails. Instead perhaps asyncReload could return the future immediately and a cleaner handoff to the async phase implemented.

This code certainly looks racy and problematic. I don't know if that's the actual cause, but it would have a larger blast area in 3.x.

ben-manes commented 3 years ago

Yes, this is certainly the cause. When the queue is full and the drain is running, there are blocked map computations. The drainer tries to complete existing futures to populate them, triggering the callbacks to remove the future from the refresh map. Due to hash collisions, that remove operation is blocked waiting on the compute to release the hashbin. This causes the deadlock.

The blocking within the asyncReload creates problems in v3. It’s a concern regardless as it stalls cache read requests waiting for the bulk load to complete. The blocking queue should be managed differently so that the asyncReload returns promptly.

ben-manes commented 3 years ago

In your case, I think what you want might be better handled by a reactive library. I haven't used those, but Reactor's bufferTimeout seems like a good candidate. This will accumulate work into a list until either a timeout elapses or a max size is reached. Presumably this is unbounded, though if not you could return a failed CompetableFuture to effectively cancel the reload and unblock so that at worst it stalls the cache operations rather than deadlocks.

Sorry for not catching this earlier, but now I am confident at what your problem is, why it changed when upgrading, and that while a desirable scenario the problem is not a bug within this library. I tried to clarify the JavaDoc to help others who might attempt this and assume it would work due to a lack of warnings.

spand commented 3 years ago

Yes that seems correct. I must have been so focused on the remapping that I completely missed the block put call.

Thanks for taking the time to look into this and your suggestions !