jhalterman / expiringmap

A high performance thread-safe map that expires entries
Apache License 2.0
1k stars 142 forks source link

ExpiringMap may return already expired entry (ExpirationPolicy.ACCESSED) #77

Open nuessgens opened 2 years ago

nuessgens commented 2 years ago

We experienced situation were the ExpiringMap returned an expired entry after ExpirationListener was already notified that this specific entry was expired. The map is configured with the policy ExpirationPolicy.ACCESSED (version 0.5.10)

The expired entry is returned in subsequent calls to get(Object) (not only once) until it is not accessed for a certain time again. Then the expired entry is expired again and correctly removed from the map.

I don't know if anybody experienced this issue or at least if this behaviour is a problem in other applications at all. A little (simplified) background why this was in issue - at least in our application ;-) : In our case we used the map to "cache" some "sequentializers" per "destination" (/key) (owning some kind of Thread). In case that destination is not "used" anymore we want to remove the sequentializer and stop the backed thread (to minimize resources). As soon as there is a task again for that specific destination we create and cache a new sequentializer (which is reused again until it is not needed for a specified time). For this usecase we used the handy ExpiringMap (great work by the way :-) )

This issue occures under very rare conditions. It seems to be a timing issue between the "application-thread" and the "ExpiringMap-Expirer-Thread" where the application tries to get the object at the exact same time the Expirer wanted to remove it.

I have created a simplified test-case which tries to repoduce this issue (see TestExpiringMap.java below). In those cases were this issue occures one would see the following sequence of "log-events":

09:12:35.115[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - stop...
09:12:35.115[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - ...stopped
09:12:35.116[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.118[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.119[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.625[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - stop...
09:12:35.625[ExpiringMap-Expirer]: ERROR: TestExpiringMap$Worker@3f44d4b - ... was already stopped!

It is easier to reproduce if one modifies the actual ExperingMap and add a Thread.sleep(1); in the get(Object) method This does not change any consistency locking but just "simulates" unfortunate thread scheduling.

  @Override
  @SuppressWarnings("unchecked")
  public V get(Object key) {
      ExpiringEntry<K, V> entry = getEntry(key);
      try {
          //simulate unfortunate thread-scheduling
          Thread.sleep(1);
      } catch (InterruptedException e) {
      }
      if (entry == null) {
          return load((K) key);
      } else if (ExpirationPolicy.ACCESSED.equals(entry.expirationPolicy.get())) {
          resetEntry(entry, false);
      }
      return entry.getValue();
  }

The sequence seems to be:

  1. main acquires read-lock; getEntry is stored for k1=v1; releases read-lock
  2. Expirer acquires write-lock; removes k1=v1; notifies expiration listener; releases write-lock
  3. main now checks if ExpirationPolicy.ACCESSED is used, and resets the Entry
  4. main acquires write-lock; k1=v1 is "reorderd" by removing k1 (which has no effect; was already removed by Expirer) and re-puts the already expired v1.
  5. main gets the already expired value v1 returned from the get(Object) method.
  6. All subsequent calls to get(Object) for k1 will return the expired v1 until it is expired again.

TestExpiringMap.java

import java.time.LocalTime;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jodah.expiringmap.ExpirationListener;
import net.jodah.expiringmap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;

public class TestExpiringMap {

    private static boolean issueOccured;

    public static void main(String[] args) {
        ExpirationListener<String, Worker> l = (k, worker) -> {
            worker.stop();
        };

        Map<String, Worker> workerCache = ExpiringMap.builder()
            .expiration(500, TimeUnit.MILLISECONDS)
            .expirationPolicy(ExpirationPolicy.ACCESSED)
            .expirationListener(l)
            .build();

        while (!issueOccured) {
            log("########### NEXT ############");
            sendMessageLoop(workerCache);
            sleepExact(750);
        }
    }

    private static void sendMessageLoop(Map<String, Worker> workerCache) {

        //create a cached worker
        doWork(workerCache);

        //try to hit the exact time, when the entry would expire
        for (int sleep = 499; sleep <= 520; sleep++) {
            if (issueOccured) {
                break;
            }
            sleepExact(sleep); //wait for it

            // verify that each invocation (in this case three) got the expired entry.
            // not only just once (but until this expired entry will expire "again")
            doWork(workerCache);
            doWork(workerCache);
            doWork(workerCache);
        }

    }

    private static void doWork(Map<String, Worker> workerCache) {
        // no need to synchronize. acces by "singe-thread" (besides the Expirer)
        Worker worker = workerCache.get("key1");
        if (worker == null) {
            worker = new Worker();
            workerCache.put("key1", worker);
        }
        worker.doTheWork();
    }

    private static void sleepExact(long millis) {
        long start = System.nanoTime();
        while (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < millis) {
            Thread.yield();
        }
    }

    private static class Worker {

        AtomicBoolean stopped = new AtomicBoolean();

        private void doTheWork() {
            if (stopped.get()) {
                log("ERROR: " + this + " already stopped!");
                issueOccured = true;
            } else {
                //DO THE WORK
            }
        }

        private void stop() {
            log(this + " - stop...");
            boolean success = stopped.compareAndSet(false, true);
            if (success) {
                log(this + " - ...stopped");
            } else {
                log("ERROR: " + this + " - ... was already stopped!");
            }
        }
    }

    private static void log(String msg) {
        System.out.println(LocalTime.now() + "[" + Thread.currentThread().getName() + "]: " + msg);
    }

}