Closed azusachino closed 1 year ago
👍🏻
补充两个个人的想法:
如果有疑问或不同建议,我们可以再进一步沟通 😀
sqlite是关系型数据库,所以用到了 rsql_builder 这样的工具进行辅助操作,并且也是符合 CRUD 设计的;
换到 sled 我没找错地方吧 这样的 kv 数据库之后,数据库设计这块是怎么考虑的呢?tb_config <-> sled::open("config"), sqlite::fetch(sql, args) <-> 对应什么呢?
目前工作中,只用过 redis 这样的 kv 数据库,认知也就停留在用 key 存取。。。
sled 的使用思路定下来之后,可以封装好工具类来处理多种数据类型吧。【设想】
目前 rnacos 使用sqlite 只有3个场景:
只需要在ConfigActor层面的3个场景做替换即可,不需要到 sqlite::fetch 层面做替换(上面的很多 sql 是通过工具生成的目前用不到)。
关于sled 与关系数据库的比较:
tree 的使用可以参考 sled项目的 examples/structured.rs
sled 的使用思路定下来之后,可以封装好工具类来处理多种数据类型吧。【设想】
赞同
大致这样改写了一版? 不知道是否符合你的预期;KV数据库,就做不到 select * from tb_config_history where a = 1, b = 2 这样的查询了哦;
顺便,do, dto, param, innersql 这么多类,确实把我绕晕进去了。。 另外可能就是,我有点代码洁癖,格式化必不可少,warning 也会尽量解决,,,目前用 cargo clippy 看 warning 还是蛮多的。。感觉我一旦提PR,会有非常多 conflicts ...
pub struct ConfigDB {
config_db: sled::Db,
config_history_db: sled::Db,
}
impl Default for ConfigDB {
fn default() -> Self {
Self::new()
}
}
impl ConfigDB {
pub fn new() -> Self {
let sys_config = AppSysConfig::init_from_env();
let config_db_path = format!("{}/config", sys_config.config_db_dir);
let config_history_db_path = format!("{}/config_history", sys_config.config_db_dir);
let config_db = sled::Config::new()
.mode(sled::Mode::HighThroughput)
.path(config_db_path)
.open()
.unwrap();
let config_history_db = sled::Config::new()
.mode(sled::Mode::LowSpace)
.path(config_history_db_path)
.open()
.unwrap();
Self {
config_db,
config_history_db,
}
}
pub fn update_config(&self, key: &ConfigKey, val: &ConfigValue) -> anyhow::Result<()> {
let config = Config {
id: None,
tenant: key.tenant.as_ref().to_owned(),
group: key.group.as_ref().to_owned(),
data_id: key.data_id.as_ref().to_owned(),
content: Some(val.content.as_ref().to_owned()),
content_md5: None,
last_time: Some(Local::now().timestamp_millis()),
};
let db_key = config.get_key();
// has old data
if let Ok(Some(config_bytes)) = self.config_db.get(&db_key) {
// let old_config = serde_json::from_slice::<Config>(&config_bytes).unwrap();
let iter = self.config_history_db.scan_prefix(&db_key);
// check if has any latest history
let mut new_key = String::new();
new_key.push_str(&db_key);
new_key.push('_');
if let Some(Ok((lk, _))) = iter.last() {
let pre_key = String::from_utf8(lk.to_vec())?;
let index = pre_key.split('_').last();
match index {
Some(index) => {
let nk = index.parse::<u32>()? + 1;
new_key.push_str(&nk.to_string());
}
None => {
new_key.push_str("1");
}
}
} else {
// insert brand new config history
new_key.push_str("1");
}
self.config_history_db.insert(new_key, config_bytes)?;
}
let config_bytes = serde_json::to_vec(&config)?;
self.config_db.insert(&db_key, config_bytes)?;
Ok(())
}
pub fn del_config(&self, key: &ConfigKey) -> anyhow::Result<()> {
let db_key = key.get_key();
if let Ok(Some(_)) = self.config_db.remove(&db_key) {
let mut iter = self.config_history_db.scan_prefix(&db_key);
while let Some(Ok((k, _))) = iter.next() {
self.config_history_db.remove(k)?;
}
}
Ok(())
}
pub fn query_config_list(&self) -> anyhow::Result<Vec<Config>> {
let mut ret = vec![];
let mut iter = self.config_db.iter();
while let Some(Ok((_, v))) = iter.next() {
let config = serde_json::from_slice::<Config>(&v)?;
ret.push(config);
}
Ok(ret)
}
// total, current list
pub fn query_config_history_page(
&self,
param: &ConfigHistoryParam,
) -> anyhow::Result<(usize, Vec<Config>)> {
if let (Some(t), Some(g), Some(id)) = (¶m.tenant, ¶m.group, ¶m.data_id) {
let key = format!("{}_{}_{}", t, g, id);
// count total using new iter, for count will use the iter
let total = self.config_history_db.scan_prefix(&key).count();
// 暂时先实现个自然插入序版本, AAAAA, 为什么要用 option...
let iter = self.config_history_db.scan_prefix(&key);
let mut ret = vec![];
if let Some(offset) = param.offset {
let mut n_i = iter.skip(offset as usize);
if let Some(limit) = param.limit {
let mut t = n_i.take(limit as usize);
while let Some(Ok((_, v))) = t.next() {
ret.push(serde_json::from_slice::<Config>(&v)?);
}
} else {
while let Some(Ok((_, v))) = n_i.next() {
ret.push(serde_json::from_slice::<Config>(&v)?);
}
}
}
return Ok((total, ret));
}
Ok((0, vec![]))
}
}
trait KeyGetter {
fn get_key(&self) -> String;
}
#[derive(Clone, PartialEq, Message, Deserialize, Serialize)]
pub struct Config {
#[prost(int64, tag = "1")]
pub id: i64,
#[prost(string, tag = "2")]
pub tenant: String,
#[prost(string, tag = "3")]
pub group: String,
#[prost(string, tag = "4")]
pub data_id: String,
#[prost(string, tag = "5")]
pub content: String,
#[prost(string, tag = "6")]
pub content_md5: String,
#[prost(int64, tag = "7")]
pub last_time: i64,
}
impl KeyGetter for Config {
fn get_key(&self) -> String {
format!("{}_{}_{}", self.tenant, self.group, self.data_id)
}
}
trait DbHelper<T: KeyGetter> {
/**
* 获取数据库前缀
*/
fn get_prefix(&self) -> String;
/**
* 获取数据库key
*/
fn get_key(&self, t: &T) -> String {
format!("{}{}", self.get_prefix(), t.get_key())
}
}
#[derive(Default)]
struct ConfigDbHelper {}
impl DbHelper<Config> for ConfigDbHelper {
fn get_prefix(&self) -> String {
"config$".to_owned()
}
}
#[derive(Default)]
struct ConfigHistoryDbHelper {}
impl DbHelper<Config> for ConfigHistoryDbHelper {
fn get_prefix(&self) -> String {
"confighistory$".to_owned()
}
}
pub struct ConfigDB {
inner_db: sled::Db,
config_helper: ConfigDbHelper,
config_history_helper: ConfigHistoryDbHelper,
}
impl ConfigDB {
pub fn new() -> anyhow::Result<Self> {
let sys_config = AppSysConfig::init_from_env();
let db = sled::open(sys_config.config_db_dir)?;
// 慎用 ..Default::default(), 会触发当前 struct 本身的 Default::default().
Ok(Self {
inner_db: db,
// 结果导致 open 了两次 db,一直 get_lock failed...
config_helper: ConfigDbHelper::default(),
config_history_helper: ConfigHistoryDbHelper::default(),
})
}
pub fn update_config(&self, key: &ConfigKey, val: &ConfigValue) -> anyhow::Result<()> {
let config = Config {
id: 0,
tenant: key.tenant.as_ref().to_owned(),
group: key.group.as_ref().to_owned(),
data_id: key.data_id.as_ref().to_owned(),
content: val.content.as_ref().to_owned(),
content_md5: Default::default(),
last_time: Local::now().timestamp_millis(),
};
let config_key = self.config_helper.get_key(&config);
// has old data
if let Ok(Some(config_bytes)) = self.inner_db.get(&config_key) {
let config_his_key = self.config_history_helper.get_key(&config);
let iter = self.inner_db.scan_prefix(&config_his_key);
// check if has any latest history
let mut new_his_key = String::new();
new_his_key.push_str(&config_his_key);
new_his_key.push('_');
if let Some(Ok((lk, _))) = iter.last() {
let pre_key = String::from_utf8(lk.to_vec())?;
let index = pre_key.split('_').last();
match index {
Some(index) => {
let nk = index.parse::<u32>()? + 1;
new_his_key.push_str(&nk.to_string());
}
None => {
new_his_key.push('1');
}
}
} else {
// insert brand new config history
new_his_key.push('1');
}
self.inner_db.insert(new_his_key, config_bytes)?;
}
// using protobuf as value serialization
// let config_bytes = serde_json::to_vec(&config)?;
let mut config_bytes = Vec::new();
config.encode(&mut config_bytes)?;
self.inner_db.insert(&config_key, config_bytes)?;
Ok(())
}
pub fn del_config(&self, key: &ConfigKey) -> anyhow::Result<()> {
let cfg = Config {
tenant: key.tenant.as_ref().to_owned(),
group: key.group.as_ref().to_owned(),
data_id: key.data_id.as_ref().to_owned(),
..Default::default()
};
let config_key = self.config_helper.get_key(&cfg);
if let Ok(Some(_)) = self.inner_db.remove(&config_key) {
let his_key = self.config_history_helper.get_key(&cfg);
let mut iter = self.inner_db.scan_prefix(&his_key);
while let Some(Ok((k, _))) = iter.next() {
self.inner_db.remove(k)?;
}
}
Ok(())
}
pub fn query_config_list(&self) -> anyhow::Result<Vec<Config>> {
let mut ret = vec![];
let his_prefix = self.config_helper.get_prefix();
let mut iter = self.inner_db.scan_prefix(&his_prefix);
while let Some(Ok((_, v))) = iter.next() {
// let cfg = NacosConfig::decode(v.to_vec())?;
let cfg = Config::decode(v.as_ref())?;
ret.push(cfg);
}
Ok(ret)
}
// total, current list
pub fn query_config_history_page(
&self,
param: &ConfigHistoryParam,
) -> anyhow::Result<(usize, Vec<Config>)> {
if let (Some(t), Some(g), Some(id)) = (¶m.tenant, ¶m.group, ¶m.data_id) {
let his_key = format!(
"{}{}_{}_{}",
self.config_history_helper.get_prefix(),
t,
g,
id
);
// count total using new iter, for count will use the iter
let total = self.inner_db.scan_prefix(&his_key).count();
let iter = self.inner_db.scan_prefix(&his_key);
let mut ret = vec![];
if let Some(offset) = param.offset {
let mut n_i = iter.skip(offset as usize);
if let Some(limit) = param.limit {
let mut t = n_i.take(limit as usize);
while let Some(Ok((_, v))) = t.next() {
ret.push(Config::decode(v.as_ref())?);
}
} else {
while let Some(Ok((_, v))) = n_i.next() {
ret.push(Config::decode(v.as_ref())?);
}
}
}
return Ok((total, ret));
}
Ok((0, vec![]))
}
}
第1点,用protobuf性能会好些。第一版本用 json 也可以接受,先让它能工作,后面再调整也行。
第2点,prost 可以通过 #[prost(type, optional, tag = "num")]
支持 option
#[prost(int64, optional , tag = "7")]
pub last_time: Option<i64>,
第3点,通过key_prefix的话,对单表的遍历支持不太友好,使用sled 的 tree 比较合适些。
一个tree简化的使用片段
//db: &sled::Db
//open_tree
let configs = db.open_tree(b"config")?;
//insert to tree
configs.insert(key1,value1)?;
configs.insert(key2,value2)?;
//get from tree by key
let value1 = configs.get(&key1)?;
//scan tree item
for item in &configs {
let (key,value) = item?;
//do something
}
//tree iter
let config_iter = configs.iter()
//open other tree
let config_historys = db.open_tree(b"config_history")?;
// do something
sled 的 tree 使用方式可以参考 sled 源码中examples/structured.rs
其它:
对于表id,可以单独使用一个对象集中储存 各个表的last_id,比如上面例子中的config_history id。
关于 RP:
每个功能可以建一个功能的分支,小版本可以提到功能分支,整体完成后再 RP到develop,最后再从 develop merge 到 master 发布。
我把你加到collaborators 你写好可运行的小版本后,代码可以分多次提交到指定分支use_sled_db ,这样我可以拉取代码本地验证和反馈,效率会高一些。
提交了一个版本,新增、历史记录功能正常,check commit
主体功能基本没有问题,就是配置历史记录查询排序有点问题。
一个配置的历史记录也可以考虑用一个单独的tree,查询时直接分页查这个tree.iter().rev()即可。
今天还发现一个历史记录功能问题:
原历史记录最后新一条记录的内容和配置内容是一致的,sled 分支历史记录最后新一条记录的内容是配置的上个版本信息。 例子: 一个配置依次更新3次, A -> B -> C (C 是最新的记录) 原历史记录为[C,B,A]有3条 ,sled分支的历史记录为[B,A] 只有2条 。
这两种记录方式从信息量上差别不大,但是控制台前端逻辑和原版本的逻辑强相关(认为最新的历史记录内容就是当前配置最新的内容);所以还是需要和sqlite版本逻辑保持一致。
大概修了一版,不过,感觉并发情况下问题还挺多的。。
这部分是在actor内调用(可以理解为只有一个线程再调用),不会有并发问题。
rnacos是基于actor并发模型actix框架开发,在actor内的操作都是单线程的,不用再对并发做特殊处理。
我后面抽空写个actor并发模型与actix的使用说明。
这个版本历史记录功能应该已能基本解决。
目前的版本有几个问题点:
//示例代码,只含主体部分
//1. 在ConfigDB 中直接维护历史 id
pub struct ConfigDB {
//...
config_history_id: i64,
}
//2. 在初始化ConfigDB 时,从 sled 加载历史 id
fn init(&mut self) {
if let Ok(Some(lastst_id_bytes)) = self.table_id_tree.get("config_history_id") {
let latest_id = decode(lastst_id_bytes)?;
self.config_history_id = latest_id;
}
}
//3. 获取新的 id,并持久化到 db中; 不同配置的历史记录可以复用同一个history_id
fn next_history_id(&mut self) -> Result<i64> {
self.config_history_id+=i;
let latest_id_bytes = encode(self.config_history_id)?;
self.table_id_tree.insert("config_history_id",latest_id_bytes)?;
Ok(self.config_history_id)
}
DB
取。如果从 DB取代价不大的话,可以考虑都在运行时从DB 取。这里一个是空间代价一个是时间代价,那个代价小可以选哪个,如果都差不多的话优先选内存小的(配置更新和配置历史记录查询才用到history_tree,这些是相对低频的使用)。这里的 2 具体指什么啊,有点没读懂。。 3 的话,不是说,后续会统一切换到 sled 吗?不仅仅是 config 这个 mod 使用。
- 感觉history_id实现上有的复杂。 每个历史id不支持超过MAX_HIS_CNT,而对超出这个 id目前没有做对应的处理,这个从功能正确性看是有问题的。 (如果只保留最近的MAX_HIS_CNT条历史记录是可以接受的,不过这部分的ID 也是一直向上增加的。)
引入这个 MAX_HIS_CNT 主要是为了,在插入 config_history 计算 key 的时候,可以使用 format!("{0:>4}")
这样的形式,以保证插入顺序,因为直接使用 i64::to_ne_bytes()
的时候,历史记录是乱序的。有没有其他好办法?
第1点用to_be_bytes()应该不会乱序,等下我也测试一下。
第3点,后继后统一切换成 sled,多个 Arc
第2点,主要是看给每个config都创建一个历史记录 tree 放到内存中,不确定这块内存消耗多少,是否有必要。这个也算优化,可以暂时不动。
先保证功能正确性,优化相关的可以后面再处理。
换成to_be_bytes()后,历史记录超过1万条,没有乱序。 更新配置 rt在0.5ms 左右,查询历史记录rt在30ms左右,tps在4500左右(只用单进程简单压测debug版本服务,应该还有提升空间)
目前功能上已经达成。
后面确认几方面的内容后就可合并到主干:
上面第1、3点已确认没问题。
第2点:config_history表名已增加前缀并验证通过。
第6点:我在最新develop分支增加了压测工具,用压测工具对 sled 版本做压测,性能提升非常显著。sled 的写入 qps 60秒平均能在1万4左右,是sqlite版本的二十几倍。
不过sled版本的内存使用量比较大。180个配置,每个配置有3千条压测历史记录,启动后什么事不干就占用75MB。 比 sqlite 版本的不到3MB 有明显的增长。 配置在内存的占用量应该不会这么高,可能和 sled tree 的缓存有关。 这个点可以后面再优化。
@azusachino 这个分支从功能、性能上都已满足,多占用几十 MB的内存算一个小问题,可以考虑后面再优化。 如果你这边没有其它意见的话,这个分支可以合并到 develop,这个 issue 可以考虑完结。
然后分支合并后我们在 develop 统一处理cargo clippy的问题,并对整个项目使用cargo fmt --all 做格式化。
@heqingpan 可以的,缓存优化的方向我可以开始调研。
(端午假期稍微休息了一下)
那我合并下分支到 develop,处理cargo clippy 和cargo fmt。 develop 分支稳定后再合并到 master。
假期休息很正常,我也稍微休息了一下。 毕竟是用爱发电,不要求时间,贵在坚持,共勉。
我昨天做了一版sled 内存调优,变更分支: optimize_sled (基于最新的 develop 分支)
分析内存占比:
优化策略:
优化效果:
qps 还是1.5 万左右;
内存:
我还写了另一个限制配置历史记录数据的测试版本,每次更新配置时,如果历史记录数据大于300就删除最早的一条记录,保证记录不会超过300条。这个版本压测150配置时内存能保持到35M以内,不过 qps 下降到7、8千; 因为一般使用配置历史记录不会超限,这个逻辑就先不上。(如果有需要可以考虑再另起一个线程延后批量处理记录,这样 qps 应该不会下降太多)
内存优化后,启动几十个手动配置在25M 以内,这个内存可接受。
@azusachino 你这边也可以抽空看看,如果没有问题我后面就合并到主干并打包新的版本。
原 sqlite 版本是0.1.x , sled 与 sqlite不兼容版本会切到0.2.x 。
已合并的主干分支,并发布新的版本0.2.0
TODO: