Closed nomoneynolife closed 2 months ago
认同需求。思路或建议:是否可以对接自己的API,给出对接参数和调用标准。我自己开发API可以先将用户信息做处理,例如:识别用户、鉴别问题,或者做langchain应用查询知识库,再自己调用GPT或Qwen接口,将结果按标准传回
fastgpt,你值得拥有
接入fastgpt知识库就好了
接入fastgpt知识库就好了
收费的哇
接入fastgpt知识库就好了
收费的哇
你自己部署啊
不是有个自带的keyword插件吗,自己改造下,加个功能可以增删改查
def on_handle_context(self, e_context: EventContext):
if e_context["context"].type != ContextType.TEXT:
return
content = e_context["context"].content.strip()
logger.debug("[keyword] on_handle_context. content: %s" % content)
if str(co def on_handle_context(self, e_context: EventContext):
if e_context["context"].type != ContextType.TEXT:
return
content = e_context["context"].content.strip()
logger.debug("[keyword] on_handle_context. content: %s" % content)
if str(content).startswith("delkeyword"):
logger.debug("delkeyword============================================")
content_str = str(content)
key = content_str[len("delkeyword "):]
# 读取config.json文件
with open("/app/plugins/keyword/config.json", "r") as f:
jsonconfig = json.load(f)
logger.debug(jsonconfig)
#添加关键字和值到config中的keyword数组
if key in jsonconfig["keyword"]:
del jsonconfig["keyword"][key]
self.keyword = jsonconfig["keyword"]
# 写回config.json文件
with open("/app/plugins/keyword/config.json", "w", encoding='utf-8') as f:
json.dump(jsonconfig, f, indent=4, ensure_ascii=False)
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"关键字 '{key}' 已删除'。"
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑
if str(content).startswith("setkeyword"):
logger.debug("setkeyword============================================")
content_str = str(content)
key_value_pair = content_str[len("setkeyword "):].split(":::")
logger.debug("[keyword] {}".format(key_value_pair))
if len(key_value_pair) == 2:
key = key_value_pair[0].strip()
value = key_value_pair[1].strip()
logger.debug("[key] {}".format(key))
logger.debug("[value] {}".format(value))
# 读取config.json文件
with open("/app/plugins/keyword/config.json", "r") as f:
jsonconfig = json.load(f)
logger.debug(jsonconfig)
#添加关键字和值到config中的keyword数组
jsonconfig["keyword"][key] = value
self.keyword = jsonconfig["keyword"]
# 写回config.json文件
with open("/app/plugins/keyword/config.json", "w", encoding='utf-8') as f:
json.dump(jsonconfig, f, indent=4, ensure_ascii=False)
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"关键字 '{key}' 已成功设置为 '{value}'。"
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑
if content in self.keyword:
logger.info(f"[keyword] 匹配到关键字【{content}】")
reply_text = self.keyword[content]
# 判断匹配内容的类型
if (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".img", ".gif"]):
# 如果是以 http:// 或 https:// 开头,且".jpg", ".jpeg", ".png", ".gif", ".img"结尾,则认为是图片 URL。
reply = Reply()
reply.type = ReplyType.IMAGE_URL
reply.content = reply_text
elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".pdf", ".doc", ".docx", ".xls", "xlsx",".zip", ".rar"]):
# 如果是以 http:// 或 https:// 开头,且".pdf", ".doc", ".docx", ".xls", "xlsx",".zip", ".rar"结尾,则下载文件到tmp目录并发送给用户
file_path = "tmp"
if not os.path.exists(file_path):
os.makedirs(file_path)
file_name = reply_text.split("/")[-1] # 获取文件名
file_path = os.path.join(file_path, file_name)
response = requests.get(reply_text)
with open(file_path, "wb") as f:
f.write(response.content)
#channel/wechat/wechat_channel.py和channel/wechat_channel.py中缺少ReplyType.FILE类型。
reply = Reply()
reply.type = ReplyType.FILE
reply.content = file_path
elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".mp4"]):
# 如果是以 http:// 或 https:// 开头,且".mp4"结尾,则下载视频到tmp目录并发送给用户
reply = Reply()
reply.type = ReplyType.VIDEO_URL
reply.content = reply_text
else:
# 否则认为是普通文本
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = reply_text
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑ntent).startswith("delkeyword"):
logger.debug("delkeyword============================================")
content_str = str(content)
key = content_str[len("delkeyword "):]
# 读取config.json文件
with open("/app/plugins/keyword/config.json", "r") as f:
jsonconfig = json.load(f)
logger.debug(jsonconfig)
#添加关键字和值到config中的keyword数组
if key in jsonconfig["keyword"]:
del jsonconfig["keyword"][key]
self.keyword = jsonconfig["keyword"]
# 写回config.json文件
with open("/app/plugins/keyword/config.json", "w", encoding='utf-8') as f:
json.dump(jsonconfig, f, indent=4, ensure_ascii=False)
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"关键字 '{key}' 已删除'。"
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑
if str(content).startswith("setkeyword"):
logger.debug("setkeyword============================================")
content_str = str(content)
key_value_pair = content_str[len("setkeyword "):].split(":::")
logger.debug("[keyword] {}".format(key_value_pair))
if len(key_value_pair) == 2:
key = key_value_pair[0].strip()
value = key_value_pair[1].strip()
logger.debug("[key] {}".format(key))
logger.debug("[value] {}".format(value))
# 读取config.json文件
with open("/app/plugins/keyword/config.json", "r") as f:
jsonconfig = json.load(f)
logger.debug(jsonconfig)
#添加关键字和值到config中的keyword数组
jsonconfig["keyword"][key] = value
self.keyword = jsonconfig["keyword"]
# 写回config.json文件
with open("/app/plugins/keyword/config.json", "w", encoding='utf-8') as f:
json.dump(jsonconfig, f, indent=4, ensure_ascii=False)
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"关键字 '{key}' 已成功设置为 '{value}'。"
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑
if content in self.keyword:
logger.info(f"[keyword] 匹配到关键字【{content}】")
reply_text = self.keyword[content]
# 判断匹配内容的类型
if (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".img", ".gif"]):
# 如果是以 http:// 或 https:// 开头,且".jpg", ".jpeg", ".png", ".gif", ".img"结尾,则认为是图片 URL。
reply = Reply()
reply.type = ReplyType.IMAGE_URL
reply.content = reply_text
elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".pdf", ".doc", ".docx", ".xls", "xlsx",".zip", ".rar"]):
# 如果是以 http:// 或 https:// 开头,且".pdf", ".doc", ".docx", ".xls", "xlsx",".zip", ".rar"结尾,则下载文件到tmp目录并发送给用户
file_path = "tmp"
if not os.path.exists(file_path):
os.makedirs(file_path)
file_name = reply_text.split("/")[-1] # 获取文件名
file_path = os.path.join(file_path, file_name)
response = requests.get(reply_text)
with open(file_path, "wb") as f:
f.write(response.content)
#channel/wechat/wechat_channel.py和channel/wechat_channel.py中缺少ReplyType.FILE类型。
reply = Reply()
reply.type = ReplyType.FILE
reply.content = file_path
elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".mp4"]):
# 如果是以 http:// 或 https:// 开头,且".mp4"结尾,则下载视频到tmp目录并发送给用户
reply = Reply()
reply.type = ReplyType.VIDEO_URL
reply.content = reply_text
else:
# 否则认为是普通文本
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = reply_text
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束,并跳过处理context的默认逻辑
mark一下
你可以仿照bot文件夹里的模型调用方式,自己实现api接口,简单点直接通过http请求访问参考baidu_unit_bot.py的实现方式,在后端你可以部署自己的AI模型,配好请求接口就能用了。我这边试过本地的botpress模型通过请求调用然后发送内容到公众号上,思路是可行的。
⚠️ 搜索是否存在类似issue
总结
有些知识AI肯定没有,对吧
举例
比如,通过企业微信自建接入后: 用户问,大佬,年假是怎么计算的啊。,,,这时AI就不行了(每个公司制度不一样) 我直接自定义回复内容,年假的计算方式为:年休假天数=(当年已在职天数/365天)*当年度年休假总天数 跪求大佬们开发,自定义功能,
动机
有实际需求