ben-manes / caffeine

A high performance caching library for Java
Apache License 2.0
15.71k stars 1.59k forks source link

Race Condition in Cache Refresh #1690

Closed seashore115 closed 4 months ago

seashore115 commented 4 months ago

I currently utilize the caffeine cache to achieve no cache missing during the cache refresh. My expectation is during the cache refresh, it always return stales value and after cache refresh completes, the cache would return latest value. The cache is supposed to not miss and only load once. However, when I tested this code below, it looks like cache refresh twice or cache miss triggered another load.

Here is the failure test case:

private static int callCount = 0;
private final LoadingCache<Integer, Integer> cache = Caffeine.newBuilder()
        .removalListener(this::listenRemoval)
        .executor(MoreExecutors.directExecutor())
        .recordStats()
        .build(new CacheLoader<>() {
            @Override
            public  Integer load(Integer _key) {
                Integer val = getValue();
                System.out.println("Loading value: " + val);
                return val;
            }
        });

private void listenRemoval(Integer key, Integer _value, RemovalCause cause) {
    // We don't want to reload if the value was just replaced
    if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) {
        cache.refresh(key);
    }
}

private Integer getValue() {
        callCount++;
        return callCount;
}

@RepeatedTest(20)
public void testCacheEvictionRefresh() throws BrokenBarrierException, InterruptedException {
    cache.get(1);
    assertThat(cache.stats().loadCount()).isEqualTo(1);

    final CyclicBarrier gate = new CyclicBarrier(7);

    Thread t1 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t1 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t2 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t2 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t6 = new Thread(){
        public void run(){
            try {
                gate.await();
                Thread.sleep(5);
                System.out.println("t6 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t3 = new Thread(){
        public void run(){
            try {
                gate.await();
                Thread.sleep(5);
                System.out.println("t3 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t4 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("Cache Invalidate start");
                cache.invalidateAll();
                System.out.println("Cache Invalidate end");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t5 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t5 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    t6.start();

    gate.await();
    assertThat(cache.stats().loadCount()).isEqualTo(1);
    Thread.sleep(2000);
}

The result shows Loading value: 1 Cache Invalidate start t1 Cache retrieval:1 t2 Cache retrieval:1 t5 Cache retrieval:1 Loading value: 2 Loading value: 3 t3 Cache retrieval:3 t6 Cache retrieval:3 Cache Invalidate end

ben-manes commented 4 months ago

Can you see if this is related to #1478? Sorry, I haven't been able to work on these issues.

seashore115 commented 4 months ago

Hi Ben, thanks for your quick reply on it. I think https://github.com/ben-manes/caffeine/issues/1478 doesn't work as you mentioned refresh has to work with expire, while my test cases first invalidate cache and then trigger the refresh. Thus, I think they might be different issues.

ben-manes commented 4 months ago

Thanks. If you override CacheLoader.reload then you'll be able to distinguish if it was a cache miss or reloaded twice. I'll try to trace through this tonight. I recall we sometimes had to be aggressive in discarding a refresh in order to have stronger linearizable behavior, so a discarded reload may happen (e.g. if a write explicitly forced it to be discarded).

ben-manes commented 4 months ago

I modified your tests slightly to remove the races so it fails consistently. What I see is,

  1. The entry is explicitly loaded (a miss + load)
  2. Some threads hit
  3. The cache is cleared
  4. The removal listener triggers a refresh
  5. Some threads hit as the refresh completes

Since the cache entry wasn't present, the refresh acts like an asynchronous load, so only CacheLoader.load is called as there is no stale value being replaced. Since that is a new load, but not a cache miss, the loadCount=2 and the missCount=1.

I think this is working as expected because of the explicit invalidation causing the entry to be discarded, so a new load is required. The loadCount includes refreshes, which I think you assumed it did not?

test code ```java import static com.google.common.truth.Truth.assertThat; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.RepetitionInfo; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; import com.google.common.util.concurrent.MoreExecutors; public class Issue1690 { private static int callCount = 0; private final LoadingCache cache = Caffeine.newBuilder() .removalListener(this::listenRemoval) .executor(MoreExecutors.directExecutor()).recordStats() .build(new CacheLoader<>() { @Override public Integer load(Integer _key) { Integer val = getValue(); System.out.println("Loading value: " + val); return val; } @Override public Integer reload(Integer _key, Integer _oldValue) { Integer val = getValue(); System.out.printf("Reloading value: %s -> %s%n", _oldValue, val); return val; } }); private void listenRemoval(Integer key, Integer _value, RemovalCause cause) { // We don't want to reload if the value was just replaced if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) { System.out.printf("Refreshing %s -> %s%n", key, _value); cache.refresh(key); } } private Integer getValue() { callCount++; return callCount; } @RepeatedTest(20) public void testCacheEvictionRefresh(RepetitionInfo info) throws BrokenBarrierException, InterruptedException { System.out.println("START #" + info.getCurrentRepetition()); cache.get(1); System.out.println(cache.stats()); assertThat(cache.stats().loadCount()).isEqualTo(1); final CyclicBarrier gate = new CyclicBarrier(7); Thread t1 = new Thread() { @Override public void run() { try { gate.await(); System.out.println("t1 Cache retrieval:" + cache.get(1)); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; Thread t2 = new Thread() { @Override public void run() { try { gate.await(); System.out.println("t2 Cache retrieval:" + cache.get(1)); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; Thread t6 = new Thread() { @Override public void run() { try { gate.await(); Thread.sleep(5); System.out.println("t6 Cache retrieval:" + cache.get(1)); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; Thread t3 = new Thread() { @Override public void run() { try { gate.await(); Thread.sleep(5); System.out.println("t3 Cache retrieval:" + cache.get(1)); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; Thread t4 = new Thread() { @Override public void run() { try { gate.await(); System.out.println("Cache Invalidate start"); cache.invalidateAll(); System.out.println("Cache Invalidate end"); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; Thread t5 = new Thread() { @Override public void run() { try { gate.await(); System.out.println("t5 Cache retrieval:" + cache.get(1)); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // do stuff } }; t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); gate.await(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); System.out.println(cache.stats()); assertThat(cache.stats().loadCount()).isEqualTo(1); Thread.sleep(2000); System.out.println("END\n\n"); } } ```
test output ```console Loading value: 1 CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=163958, evictionCount=0, evictionWeight=0} Cache Invalidate start t1 Cache retrieval:1 t5 Cache retrieval:1 t2 Cache retrieval:1 Refreshing 1 -> 1 Loading value: 2 Cache Invalidate end t3 Cache retrieval:2 t6 Cache retrieval:2 CacheStats{hitCount=5, missCount=1, loadSuccessCount=2, loadFailureCount=0, totalLoadTime=1510375, evictionCount=0, evictionWeight=0} ```
seashore115 commented 4 months ago

I modified your tests slightly to remove the races so it fails consistently. What I see is,

  1. The entry is explicitly loaded (a miss + load)
  2. Some threads hit
  3. The cache is cleared
  4. The removal listener triggers a refresh
  5. Some threads hit as the refresh completes

Since the cache entry wasn't present, the refresh acts like an asynchronous load, so only CacheLoader.load is called as there is no stale value being replaced. Since that is a new load, but not a cache miss, the loadCount=2 and the missCount=1.

I think this is working as expected because of the explicit invalidation causing the entry to be discarded, so a new load is required. The loadCount includes refreshes, which I think you assumed it did not?

test code

import static com.google.common.truth.Truth.assertThat;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.RepetitionInfo;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.util.concurrent.MoreExecutors;

public class Issue1690 {
  private static int callCount = 0;
  private final LoadingCache<Integer, Integer> cache =
      Caffeine.newBuilder()
          .removalListener(this::listenRemoval)
          .executor(MoreExecutors.directExecutor()).recordStats()
          .build(new CacheLoader<>() {
            @Override
            public Integer load(Integer _key) {
              Integer val = getValue();
              System.out.println("Loading value: " + val);
              return val;
            }
            @Override
            public Integer reload(Integer _key, Integer _oldValue) {
              Integer val = getValue();
              System.out.printf("Reloading value: %s -> %s%n", _oldValue, val);
              return val;
            }
          });

  private void listenRemoval(Integer key, Integer _value, RemovalCause cause) {
    // We don't want to reload if the value was just replaced
    if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) {
      System.out.printf("Refreshing %s -> %s%n", key, _value);
      cache.refresh(key);
    }
  }

  private Integer getValue() {
    callCount++;
    return callCount;
  }

  @RepeatedTest(20)
  public void testCacheEvictionRefresh(RepetitionInfo info) throws BrokenBarrierException, InterruptedException {
    System.out.println("START #" + info.getCurrentRepetition());

    cache.get(1);
    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);

    final CyclicBarrier gate = new CyclicBarrier(7);

    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t1 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t2 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t6 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t6 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t3 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t3 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t4 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("Cache Invalidate start");
          cache.invalidateAll();
          System.out.println("Cache Invalidate end");
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t5 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t5 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    t6.start();

    gate.await();
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();

    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);
    Thread.sleep(2000);

    System.out.println("END\n\n");
  }
}

test output

Loading value: 1
CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=163958, evictionCount=0, evictionWeight=0}
Cache Invalidate start
t1 Cache retrieval:1
t5 Cache retrieval:1
t2 Cache retrieval:1
Refreshing 1 -> 1
Loading value: 2
Cache Invalidate end
t3 Cache retrieval:2
t6 Cache retrieval:2
CacheStats{hitCount=5, missCount=1, loadSuccessCount=2, loadFailureCount=0, totalLoadTime=1510375, evictionCount=0, evictionWeight=0}

Hi Ben, thanks for looking into it. I run your tests at my local but miscount is more than 1. START #1

Loading value: 1 CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=381299, evictionCount=0, evictionWeight=0} Cache Invalidate start t1 Cache retrieval:1 t5 Cache retrieval:1 t2 Cache retrieval:1 Refreshing 1 -> 1 Loading value: 2 Loading value: 3 t6 Cache retrieval:3 t3 Cache retrieval:3 Cache Invalidate end CacheStats{hitCount=4, missCount=2, loadSuccessCount=3, loadFailureCount=0, totalLoadTime=2637255, evictionCount=0, evictionWeight=0}

Is it expected? if so, how could I avoid cache miss during the invalidateCache and refresh?

ben-manes commented 4 months ago

You'd have a race condition because when invalidated the entry is gone, so a lookup might start before the refresh does.

It sounds like refresh is being used as if to reload the cache rather than as paired with expiration. It probably doesn't make sense alone, but Guava Cache didn't disallow it. The intent was to allow hot entries to be reloaded and cold ones to fade away by expiration, so hot entries did not have a latency hit by expiring.

If you are instead trying to periodically reload an unbounded cache then its better to use a simple scheduled task. Then you can replace the values without a race of a temporary gap and callers see the current value while being reloaded. Is that what you're aiming for?

seashore115 commented 4 months ago

You'd have a race condition because when invalidated the entry is gone, so a lookup might start before the refresh does.

It sounds like refresh is being used as if to reload the cache rather than as paired with expiration. It probably doesn't make sense alone, but Guava Cache didn't disallow it. The intent was to allow hot entries to be reloaded and cold ones to fade away by expiration, so hot entries did not have a latency hit by expiring.

If you are instead trying to periodically reload an unbounded cache then its better to use a simple scheduled task. Then you can replace the values without a race of a temporary gap and callers see the current value while being reloaded. Is that what you're aiming for?

Gotcha. Yep, you are correct.

Is this as what you recommend? executor.submit({}->cache.asMap().replace(K,V)); if so, will it lock the hot spot key during that time?

ben-manes commented 4 months ago

It will lock for other writes to that key, but reads are lock-free. The lock duration will be small.

If you don't need any other features then you can use a ConcurrentHashMap directly. An even simpler approach is to have an immutable map that you rebuild periodically.

volatile Map<K, V> data = Map.of();

scheduledExecutor.scheduleWithFixedDelay(() -> 
    var results = loadAll();
    data = Map.copyOf(results);
}, 0, 1, TimeUnit.MINUTES);
seashore115 commented 4 months ago

Gotcha, thanks Ben. that works