Closed joycedoor closed 2 months ago
def merge_real_time_db(key, merge_path: str, db_paths: [str] or str):
"""
合并实时数据库消息,暂时只支持64位系统
:param key: 解密密钥
:param db_paths: 数据库路径
:param merge_path: 合并后的数据库路径
:return:
"""
try:
import platform
except:
raise ImportError("未找到模块 platform")
# 判断系统位数是否为64位,如果不是则抛出异常
if platform.architecture()[0] != '64bit':
raise Exception("System is not 64-bit.")
if isinstance(db_paths, str):
db_paths = [db_paths]
endbs = []
for db_path in db_paths:
if not os.path.exists(db_path):
# raise FileNotFoundError("数据库不存在")
continue
if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path:
# raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库
continue
endbs.append(db_path)
endbs = '" "'.join(list(set(endbs)))
merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径
# 获取当前文件夹路径
current_path = os.path.dirname(__file__)
real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe")
# 调用cmd命令
cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"'
# os.system(cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base,
creationflags=subprocess.CREATE_NO_WINDOW)
# p.communicate()
# 查看返回值
out, err = p.communicate()
if out and out.decode("utf-8").find("SUCCESS") >= 0:
wx_core_loger.info(f"合并实时数据库成功{out}")
return True, merge_path
if err:
wx_core_loger.error(f"合并实时数据库失败\n{out}\n{err}")
return False, err
请修改这个函数,然后查看一下日志,看看返回内容是什么
直接运行的话,返回的是None,我print 出来了out 和 err,分别是 b'ERROR I 1.\r\nERROR 4.\r\n' 和 b''
def merge_real_time_db(key, merge_path: str, db_paths: [str] or str):
"""
合并实时数据库消息,暂时只支持64位系统
:param key: 解密密钥
:param db_paths: 数据库路径
:param merge_path: 合并后的数据库路径
:return:
"""
try:
import platform
except:
raise ImportError("未找到模块 platform")
# 判断系统位数是否为64位,如果不是则抛出异常
if platform.architecture()[0] != '64bit':
raise Exception("System is not 64-bit.")
if isinstance(db_paths, str):
db_paths = [db_paths]
merge_path = os.path.abspath(merge_path) # 合并后的数据库路径
endbs = []
for db_path in db_paths:
if not os.path.exists(db_path):
# raise FileNotFoundError("数据库不存在")
continue
if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path:
# raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库
continue
endbs.append(os.path.abspath(db_path))
endbs = '" "'.join(list(set(endbs)))
merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径
# 获取当前文件夹路径
current_path = os.path.dirname(__file__)
real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe")
# 调用cmd命令
cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"'
# os.system(cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base,
creationflags=subprocess.CREATE_NO_WINDOW)
# p.communicate()
# 查看返回值
out, err = p.communicate()
if out and out.decode("utf-8").find("SUCCESS") >= 0:
wx_core_loger.info(f"合并实时数据库成功{out}")
return True, merge_path
if err:
wx_core_loger.error(f"合并实时数据库失败\n{out}\n{err}")
return False, err
现在我将输入路径修改为了绝对路径,用这段代码试试看看是否可以呢
def merge_real_time_db(key, merge_path: str, db_paths: [str] or str): """ 合并实时数据库消息,暂时只支持64位系统 :param key: 解密密钥 :param db_paths: 数据库路径 :param merge_path: 合并后的数据库路径 :return: """ try: import platform except: raise ImportError("未找到模块 platform") # 判断系统位数是否为64位,如果不是则抛出异常 if platform.architecture()[0] != '64bit': raise Exception("System is not 64-bit.") if isinstance(db_paths, str): db_paths = [db_paths] merge_path = os.path.abspath(merge_path) # 合并后的数据库路径 endbs = [] for db_path in db_paths: if not os.path.exists(db_path): # raise FileNotFoundError("数据库不存在") continue if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path: # raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库 continue endbs.append(os.path.abspath(db_path)) endbs = '" "'.join(list(set(endbs))) merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径 # 获取当前文件夹路径 current_path = os.path.dirname(__file__) real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe") # 调用cmd命令 cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"' # os.system(cmd) p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base, creationflags=subprocess.CREATE_NO_WINDOW) # p.communicate() # 查看返回值 out, err = p.communicate() if out and out.decode("utf-8").find("SUCCESS") >= 0: wx_core_loger.info(f"合并实时数据库成功{out}") return True, merge_path if err: wx_core_loger.error(f"合并实时数据库失败\n{out}\n{err}") return False, err
现在我将输入路径修改为了绝对路径,用这段代码试试看看是否可以呢
这样可以了,成功了,感谢。 不过wx_core_loger是啥,好像没有这个函数.
是否可以更新一个小版本呢,修复一下这一块,万分感谢!
wx_core_loger,是我之后版本准备采用的统一日志处理
V3.1.4
问题描述 版本更新后merge_real_time_db无法使用了。以前这个函数接受5个参数,包括开始时间戳和结束时间戳,现在不接受两个时间戳变量了。因此我对自己的代码微调了一下:
复现步骤 merge_real_time_db(wx_info["key"], ”./tmp/MergedDB.db“, ['C:\xx\Msg\MicroMsg.db', 'C:\xx\Msg\Multi\MSG0.db'])
使用如上代码,期望能够将MSG数据库和MicroMsg数据库的实时数据合并到一个大的MergedDB.db文件中
实际行为 并没有报错,也没有成功合并出数据库
环境信息
其他信息 请提供任何与问题相关的其他信息(文字,截图等)。