wvwwvwwv / scalable-concurrent-containers

High performance containers and utilities for concurrent and asynchronous programming
Apache License 2.0
285 stars 14 forks source link

[request] Cache map with duplicate keys "waiting" in a queue #138

Closed milesj closed 3 months ago

milesj commented 3 months ago

Hey all, great library. I have a use case that I'm trying to solve in an elegant way, and I'm curious how I would solve this with scc.

In simple terms, I'm trying to cache the result of git commands, in an effort to reduce the amount of calls being made. The type of this structure is very simple: HashCache<String, Arc<String>> (or HashMap). While this does work, it doesn't stop the same command being executed simultaneously from many threads. For example, this kind of stuff appears in our logs consistently:

[DEBUG 16:20:42.985] moon_process::command_inspector  Running command `git ls-files --full-name --cached --modified --others --exclude-standard website --deduplicate`
[DEBUG 16:20:42.985] moon_process::command_inspector  Running command `git ls-files --full-name --cached --modified --others --exclude-standard website --deduplicate`
[DEBUG 16:20:42.985] moon_process::command_inspector  Running command `git ls-files --full-name --cached --modified --others --exclude-standard website --deduplicate`
[DEBUG 16:20:42.985] moon_process::command_inspector  Running command `git ls-files --full-name --cached --modified --others --exclude-standard website --deduplicate`

Ideally it's just 1 statement:

[DEBUG 16:20:42.985] moon_process::command_inspector  Running command `git ls-files --full-name --cached --modified --others --exclude-standard website --deduplicate`

Since these all are executed at basically the same exact millisecond, they all trigger a cache-miss, and then still run to completion.

I've temporarily worked around this with a secondary map of HashMap<String, Arc<Mutex<()>>, which causes subsequent commands to "wait" until the previous is complete, and then either read/write. While this works, I was hoping there's a better way to handle this. The pseudo-code looks like the following:

// First check if the data has already been cached
if let Some(cache) = self.cache.get(&cache_key) {
    return Ok(Arc::clone(cache.get()));
}

// Otherwise wait in the queue
let entry = self
    .queue
    .entry(cache_key.clone())
    .or_insert_with(|| Mutex::new(()));
let _guard = entry.value().lock().await;

// Check the cache again in case the data was written in between locks
if let Some(cache) = self.cache.get(&cache_key) {
    return Ok(Arc::clone(cache.get()));
}

// We're good, nothing in cache, so write to the cache!
let cache = Arc::new(do_operation());

let _ = self.cache.insert(cache_key, Arc::clone(&cache));

Ok(cache)

It would be great if this library supported this natively (maybe it does and I've been using it wrong?).

Another trick I tried is to utilize the scc Entry, but this doesn't actually "lock" the row, so we still have multi-write problems.

wvwwvwwv commented 3 months ago

Hi,

My suggestion is that, you first try to check whether the data is cached using read or read_async that acquires only a shared lock on the entry (allowing others to read the same cached entry at the same time), and if not cached, use entry or entry_async that exclusively locks the entry - _note that entry or entry_async exclusively locks the row_.

Not entirely sure if I understood the semantics of the code snippet correctly, but my version of the code would be like this. -> First of all, I don't think you need an additional mutex here, so the type of the HashMap will be: HashMap<String, Arc>.

// First check if the data has already been cached.
//
// Concurrent access to the entry is allowed.
if let Some(cache) = self.cache.read(&cache_key, |_, v| v.clone()) {
    return Ok(cache);
}

// Otherwise gain exclusive access to the key.
//
// It is the only thread accessing the key until returning the result.
let entry = self
    .cache
    .entry(cache_key.clone())
    .or_insert_with(|| do_operation() /* If the key does not exist, insert a new one */);

Ok(entry.get().clone())
milesj commented 3 months ago

@wvwwvwwv Thanks for the response. I tried this approach and it seems to work, but not always. I do see some duplicates appearing in the logs.

One difference though is that my do_operation is actually async code, so I had to split it up like this:

// First check if the data has already been cached
if let Some(cache) = self.cache.read(&cache_key, |_, v| v.clone()) {
    return Ok(cache);
}

// Otherwise acquire an entry to lock the row
let entry = self.cache.entry(cache_key);

let output = executor.exec_capture_output().await?;
let value = output_to_string(&output.stdout);
let cache = Arc::new(format(if trim { value.trim().to_owned() } else { value }));

entry.put_entry(Arc::clone(&cache));

Does the entry still lock the row like this? Maybe I should try the async methods in this context.

milesj commented 3 months ago

I tried the async methods and it works much better, in like 99% of cases so far. I did see 1 duplicate, which was weird, since there's actually a few milliseconds difference between them.

[DEBUG 10:32:22.298] moon_process::command_inspector  Running command git ls-files --full-name --cached --modified --others --exclude-standard packages/runtime --deduplicate  env_vars={} working_dir=Some("/Users/miles/Projects/moon")
[DEBUG 10:32:22.304] moon_process::command_inspector  Running command git ls-files --full-name --cached --modified --others --exclude-standard packages/runtime --deduplicate  env_vars={} working_dir=Some("/Users/miles/Projects/moon")

We can probably close this thread unless you have other feedback.

wvwwvwwv commented 3 months ago

I think you'll want to use async methods in order to avoid potential (sync-locking in async code is bad in general) deadlock.

let entry = self.cache.entry_async(cache_key).await; // Async!!
match entry {
    // There is a chance that the same key was inserted in the meantime, so check if the entry is occupied!
    Entry::Occupied(o) => o.get().clone(),
    Entry::Vacant(v) => {
        let output = executor.exec_capture_output().await?;
        let value = output_to_string(&output.stdout);
        let cache = Arc::new(format(if trim { value.trim().to_owned() } else { value }));
        v.insert_entry(cache).get().clone()
    }
}
milesj commented 3 months ago

Awesome, thanks, works like a charm!