Open adambezecny opened 3 years ago
There's no reason other than that I haven't needed it and no one's asked for it yet. We could add an option to the macro #[cached(rwlock)]
and have it use https://docs.rs/async-rwlock/1.3.0/async_rwlock/ instead of the async-mutex. I'm not sure exactly how you're using this, but it's good to keep in mind that if your cached function is going to be called frequently with new values (that would cause "writes"), then a rwlock may increase the latency of your readers (calls with arguments that are cached), as opposed to read and write being equally "not the fastest" with a regular mutex: https://blog.nelhage.com/post/rwlock-contention/
Ah.. you know what... I forgot the reason a RwLock won't work is because of the signature
/// Attempt to retrieve a cached value
fn cache_get(&mut self, k: &K) -> Option<&V>;
which needs to be able to mutate the cache to update the hit rate statistics, and in some cases (LRU and Timed) modify the internal data of the cache on reads.
Since a RwLock (correctly) prevents you from mutating on reads, the only scenario where you could use a RwLock is if the backing cache is a plain HashMap with no extra features. So in order to support using a RwLock, the cached
macro would either need to restrict you to only using a HashMap as the cache-type, or introducing a second trait CachedPlain
which has only
fn cache_get(&self, k: &K) -> Option<&V>;
fn cache_set(&mut self, k: K, v: V) -> Option<V>;
and calling the differing trait methods cached::Cached::cache_get(&mut cache, &key)
vs cached::CachedPlain::cache_get(&cache, &key)
depending on the macro arguments (rwlock = true
).
Note, this would mean you can't use fancier cache stores like the SizedCache
/ TimedCache
/ SizedTimedCache
with a #[cached(rwlock = true)]
since they require mutating their data on reads.
we are actually going to use it without parameters, see bellow:
async fn get_new_token() -> Result<VapToken> {
let req_body = format!(
r#"
{{
"strategy": "local",
"email": "{email}",
"password": "{password}"
}}
"#,
email = VAP_AUTH.svc_acc_usr,
password = VAP_AUTH.svc_acc_pwd,
);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_str("application/json")?);
trace!("/vapapi/authentication/v1 body {}", req_body);
let resp = HTTP_CLIENT
.post(&format!(
"{}/vapapi/authentication/v1",
VAP_AUTH.vap_base_url
))
.body(req_body)
.headers(headers)
.send()
.await?;
let status = resp.status();
debug!("status: {}", status);
let resp = resp.text().await?; // extract body as string
eval_status_code_reqwest!(
status,
reqwest::StatusCode::CREATED,
resp,
"error when calling get_new_token"
);
Ok(serde_json::from_str(&resp)?)
}
/// Returns VAP bearer token as a string
/// Result is cached/memoized for 60*60*23 seconds, i.e. 23 hours
/// which leaves one hour safety buffer (VAP token validity is 24 hours)
/// cache only Ok values, not errors
#[cached(size = 1, time = 82800, result = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
_gettoken (no input params, the relevant values - fixed URL + service account credentials - are taken from config file) will retrieve token via REST call and cache for 23 hours. So while we will be reading very frequently, writes will be very unfrequent. Hence RwLock would make perfect sense for our use case, even if this would limit us in available cache stores types (we don't need anything fancy here, in fact we are caching single value).
U quickly went through RWLock contention article pasted above but I don't think this is our case, we do not call with different params, readers will be extremely quick. All we need to do is to cache single value for 23 hours.
Ah I see. I don’t think I’ll have time to update the macro this week, so in the meantime you could emulate what the macro would be doing (and have a little more control over stopping a stampede):
disclaimer, I didn’t try compiling this
lazy_static! {
static ref CACHE: Arc<async_rwlock::RwLock<Vec<i64, String>>> = Arc::new(RwLock::new(Vec::with_capacity(1)));
}
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await.get(0);
if let Some((created_ts, token)) = token {
if (now - created_ts) < 82800 { return Ok(token) }
}
}
let mut store = CACHE.write().await;
let already_set = store.get(0).map(|(created_ts, _)| (now - created_ts) < 82800).unwrap_or(false);
if already_set {
Ok(store[0].1.clone())
else {
let token = get_new_token().await?;
store.clear();
store.push((now, token.clone()));
Ok(token)
}
}
Thank you James for your quick support! I will implement workaround for now and looking forward updated macro.
cheers,
Adam
I did actually compile like this:
lazy_static! {
static ref CACHE: Arc<async_rwlock::RwLock<Vec<(u128, String)>>> =
Arc::new(RwLock::new(Vec::with_capacity(1)));
}
fn time_now() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
return Ok(store[0].1.clone());
} else {
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
}
seems to be working but I think get_token it could be actually reduced to this
async fn get_token() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
Also it seems (do not have hard data to support this, just subjective feeling from initial testing) that tokio rwlock is much faster:
static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));
Nice. Yeah, the already_set
bit isn't required, but it would prevent multiple concurrent calls from calling get_new_token
when the token is missing or expired. Without it there may be multiple callers seeing that it's expired and then all queuing up to get write access, and then getting a new token even if something else just did so.
I haven't compared async_rwlock vs tokio, but that may be the case!
Maybe consider using dashmap? It is designed to be a high-performance hash map designed for concurrency.
I would say it actually fit quite well with caching.
There’s also another crate left_right that also provides better concurrency than RwLock<HashMap>
.
It is designed to be make reading scale linearly and not blocked by writing while making the writing slower, which IMHO fits in the pattern of caching, where cache hit should be as fast as possible while cache miss can be slower.
Edit:
It seems left_right suggests to use its high-level wrapper evmap instead of left_right directly.
Edit2:
concread seems to provide data structures works similar to evmap, but in additional to HashMap
, it also provides BTreeMap
and Adaptive Replacement Cache, the later might be more interesting than HashMap
.
hi,
any progress on this issue? Is it possible to implement this RwLock based cache as discussed?
@adambezecny I'll take another look at this tonight and add a #[once]
macro that implements the RwLock
(only caching a single value) version we were chatting about a while ago
@adambezecny A #[once]
macro is now available in 0.26.0
https://github.com/jaemk/cached/commit/0e36dbd894867a635c94e433604bb8333976ed24
hi James,
so to cache token for 23 hours I should use:
#[once(size = 1, time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
This will use RwLock, i.e. all subsequent invocations within 23 hours interval will allow parallel reads (without getting lock). is that correct? Do I understand it correctly that it is sync_writes = true which causes to synchronize/lock write operations only? I.e. without *sync_writes = true Mutex is used, with *sync_writes = true RwLock is used? Correct?
regards,
Adam
@adambezecny
once
will always use a RwLock
, but when sync_writes = true
it will try to acquire the write-lock before executing the function that returns the value to cache (and will check if a valid cached value exists before executing) to prevent concurrent calls from executing the same logic. When sync_writes = false
(default), the write-lock is acquired after executing, only wrapping the setting of the cache.
once
also implies only one cached value, so there's no size
parameter for this one. The arguments you pass to the function are only used when there's no cached value or the cache has expired, and are otherwise ignored.
Concurrent calls will only do a RwLock::read
until the cached value has "expired" (then acquiring a write-lock), so reads should be parallel for the majority of calls.
ok, got it. so sync_writes = true basically does same above discussed section in manual code:
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
return Ok(store[0].1.clone());
} else {...
i.e. when there is no cached value and we need to get write lock (+ retrieve token and cache it) we make sure we get it only once while remaining callers will get cached value already. So final code:
#[once(time = 82800, result = true, sync_writes = true)]
pub async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
Yes, correct!
hi James,
with following dependencies:
[dependencies]
tokio = { version = "1.7.1", default-features = false, features = ["macros", "time", "rt-multi-thread", "signal", "fs", "rt"] }
cached = "0.26.0"
[dev-dependencies]
criterion = { version = "0.3.5", features = ["async_tokio", "html_reports"] }
[[bench]]
name = "benchmarks"
harness = false
I ran this criterion bechmarks:
use criterion::{criterion_group, criterion_main, Criterion};
use cached::proc_macro::once;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use va_voice_gateway_rs::errors::Result;
use va_voice_gateway_rs::nlp::vap::{VapToken, VapTokenAuth, VapTokenUser};
lazy_static! {
static ref CACHE: Arc<tokio::sync::RwLock<Vec<(u128, String)>>> =
Arc::new(tokio::sync::RwLock::new(Vec::with_capacity(1)));
}
/// dummy function for getting new token. instead of doing respective http call
/// it simply returns hardcoded value.
async fn get_new_token() -> Result<VapToken> {
Ok(VapToken {
access_token: "123456".to_owned(),
authentication: VapTokenAuth {
strategy: "local".to_owned(),
},
user: VapTokenUser {
user_id: "usr123".to_owned(),
email: "foo@bar.com".to_owned(),
description: "some desc".to_owned(),
allowed_services: vec!["SvcA".to_owned(), "SvcB".to_owned()],
},
})
}
fn time_now() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
#[once(time = 82800, result = true, sync_writes = true)]
async fn get_token() -> Result<String> {
let new_token = get_new_token().await?;
Ok(new_token.access_token)
}
async fn get_token_manual() -> Result<String> {
{
let token = CACHE.read().await;
if let Some((created_ts, token)) = token.get(0) {
if (time_now() - *created_ts) < 82800 {
return Ok(String::from(token));
}
}
}
let mut store = CACHE.write().await;
// this is needed to prevent multiple writers from getting the lock
// one by one and retrieving token multiple times
let already_set = store
.get(0)
.map(|(created_ts, _)| (time_now() - created_ts) < 82800)
.unwrap_or(false);
if already_set {
Ok(store[0].1.clone())
} else {
let token = get_new_token().await?;
store.clear();
store.push((time_now(), token.access_token.to_owned()));
Ok(token.access_token)
}
}
pub fn benchmark_suite(c: &mut Criterion) {
// create the tokio runtime to be used for the benchmarks
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
c.bench_function("get_token", |b| {
b.to_async(&rt).iter(|| async {
get_token().await.unwrap();
})
});
c.bench_function("get_token_manual", |b| {
b.to_async(&rt).iter(|| async {
get_token_manual().await.unwrap();
})
});
}
criterion_group!(benches, benchmark_suite);
criterion_main!(benches);
Results:
get_token time: [219.76 ns 223.59 ns 227.74 ns]
change: [-1.7765% +0.3829% +2.4452%] (p = 0.73 > 0.05)
No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
4 (4.00%) high mild
1 (1.00%) high severe
get_token_manual time: [199.78 ns 202.23 ns 204.91 ns]
change: [-2.9261% -0.7982% +1.5430%] (p = 0.48 > 0.05)
No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
4 (4.00%) high mild
5 (5.00%) high severe
So performance is more-less same (slightly worse) then with manual implementation.
I think the difference might be because the macros expand to use async_std
's async_mutex
and async_rwlock
whereas the manual version you have is using tokio::sync::RwLock
Sorry to break the flow.
I need a Rwlocked hashmap, and I just found your crate.
The docs kind of imply this is already possible:
[cached] macros are thread-safe with the backing function-cache wrapped in a mutex/rwlock
Alas it's not. So just registering interest. Will try to use a dashmap for now.
hi,
I am working on high performance Smart IVR use case where we need to process massive amounts of audio data. App is built on tokio.rs. We are also calling some REST APIs from application which require bearer token. Since this is expiring token we need to cache it. Original solution used actor for token management, see https://tokio.rs/tokio/tutorial/shared-state#tasks-threads-and-contention
Basically we have spawned task that is running loop accepting GetTokenRequest from channel. Internally this components holds simple cache and refreshes token as needed. Overall design is rather awkward so I was thinking about replacing whole actor/token state manager with cache that will simply memoize our get_token function (returning Result<String, Err>) .
I have one doubt: you are using Mutex, not RwLock, which probably does have some performance penalty.
Are there any specific reason why cached does not use RwLock instead so that only update will do locking?
While using cached instead of custom actor/token manager greatly simplifies our design I am little bit afraid of using Mutex in the situation where we run thousands transactions in parallel (each requiring to retrieve bearer token). Eould this not lead to contention (too many tasks trying to lock single Mutex -> additional latency introduced in tokio.rs task processing)? Any advice here?
one important note: because of the nature of our app (smart ivr/voice chatbot) low latency is extremely important for us