al8n / stretto

Stretto is a Rust implementation for Dgraph's ristretto (https://github.com/dgraph-io/ristretto). A high performance memory-bound Rust cache.
Apache License 2.0
413 stars 28 forks source link

TTL not really work #55

Open stalkerg opened 1 year ago

stalkerg commented 1 year ago

If I insert with ttl 10000 items, with 60sec TTL in my on_evict in a callback, I will see only ~1000 evict items after 60sec. During checking len() of such cache, it also shows ~9000 items. I suppose the cleaner process is not working correctly, and there is something with the TTL map.

al8n commented 1 year ago

Hi, could you provide some code to help me reproduce your situation?

stalkerg commented 1 year ago

Yeah, sorry about the issue without repro. I will try to make a small example.

stalkerg commented 1 year ago

@al8n Okey this is repro:

use tokio::time::{sleep, Duration};
use stretto::AsyncCache;

#[tokio::main]
async fn main() {
    let cache: AsyncCache<String, String> = AsyncCache::new(12960, 1e6 as i64, tokio::spawn).unwrap();

    for i in 0..10000 {
        cache.insert_with_ttl(
            format!("key{}", i),
            format!("value{}", i),
            1,
            Duration::from_secs(60),
        )
        .await;
        sleep(Duration::from_millis(1)).await;
    }

    cache.wait().await.unwrap();

    println!("Current size: {}", cache.len());
    sleep(Duration::from_secs(100)).await;
    println!("New size: {}", cache.len());
}

on my machine, I got something like this:

Current size: 10000
New size: 4824

If I have no sleep(Duration::from_millis(1)).await; during insert, it's working as expected. I use this sleep to simulate natural load behavior.

Regards,

stalkerg commented 1 year ago

I hope you can reproduce my issue.

stalkerg commented 1 year ago

@al8n, maybe next week I can look into it, but I suppose it's a bug.

stalkerg commented 1 year ago

Okey, I found why https://github.com/al8n/stretto/blob/main/src/store.rs#L285C29-L285C40 you have no guarantee what try_cleanup_async will be invoked strictly each second. I tested it, and just print the current bucket and the current attempt to cleanup - we time to the time skip seconds.

stalkerg commented 1 year ago

Okey, this one is working, but I am not sure how it's efficient:

diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..1b7c36a 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -3,6 +3,7 @@ use std::collections::{hash_map::RandomState, HashMap};
 use std::hash::BuildHasher;
 use std::ops::{Deref, DerefMut};
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use itertools::Itertools;

 use crate::CacheError;

@@ -200,11 +201,24 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {

     pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
         let bucket_num = cleanup_bucket(now);
-        Ok(self
-            .buckets
-            .write()
-            .remove(&bucket_num)
-            .map(|bucket| bucket.map))
+        let bucket_keys: Vec<i64> = self.buckets.read().keys().sorted().cloned().collect();
+        // println!("try_cleanup bucket_num: {} buckets:{:#?}", bucket_num, bucket_keys);
+        let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+        for map in bucket_keys
+            .iter()
+            .filter(|key| **key < bucket_num)
+            .map(|key| {
+                self
+                .buckets
+                .write()
+                .remove(key)
+                .map(|bucket| bucket.map)
+            }) {
+                if map.is_some() {
+                    ret_map.extend(map.unwrap().iter());
+                }
+        }
+        Ok(Some(ret_map))
     }

     pub fn hasher(&self) -> S {

will be better to use BTreeMap and range to fast filtering over the index.

stalkerg commented 1 year ago

Okey option with btree and range:

diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..e72ed4d 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -1,5 +1,5 @@
 use parking_lot::RwLock;
-use std::collections::{hash_map::RandomState, HashMap};
+use std::collections::{hash_map::RandomState, HashMap, BTreeMap};
 use std::hash::BuildHasher;
 use std::ops::{Deref, DerefMut};
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -100,7 +100,7 @@ impl<S: BuildHasher> DerefMut for Bucket<S> {

 #[derive(Debug)]
 pub(crate) struct ExpirationMap<S = RandomState> {
-    buckets: RwLock<HashMap<i64, Bucket<S>, S>>,
+    buckets: RwLock<BTreeMap<i64, Bucket<S>>>,
     hasher: S,
 }

@@ -108,7 +108,7 @@ impl Default for ExpirationMap {
     fn default() -> Self {
         let hasher = RandomState::default();
         Self {
-            buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+            buckets: RwLock::new(BTreeMap::new()),
             hasher,
         }
     }
@@ -123,7 +123,7 @@ impl ExpirationMap {
 impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
     pub(crate) fn with_hasher(hasher: S) -> ExpirationMap<S> {
         ExpirationMap {
-            buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+            buckets: RwLock::new(BTreeMap::new()),
             hasher,
         }
     }
@@ -200,11 +200,22 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {

     pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
         let bucket_num = cleanup_bucket(now);
-        Ok(self
-            .buckets
-            .write()
-            .remove(&bucket_num)
-            .map(|bucket| bucket.map))
+        let bucket_keys: Vec<i64> = self.buckets.read().range(..bucket_num).map(|(key, _)| *key).collect();
+        let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+        for map in bucket_keys
+            .iter()
+            .map(|key| {
+                self
+                .buckets
+                .write()
+                .remove(key)
+                .map(|bucket| bucket.map)
+            }) {
+                if map.is_some() {
+                    ret_map.extend(map.unwrap().iter());
+                }
+        }
+        Ok(Some(ret_map))
     }

     pub fn hasher(&self) -> S {
al8n commented 1 year ago

Sorry for the late response, thanks! Would you mind open a PR and let us see if we can merge it?

stalkerg commented 1 year ago

I don't think I will have time next few weeks.