0x5446 / api4sensevoice

API and websocket server for sensevoice. It has inherited some enhanced features, such as VAD detection, real-time streaming recognition, and speaker verification.
229 stars 32 forks source link

给作者的一些README和项目优化建议 #6

Closed RemSynch closed 2 months ago

RemSynch commented 2 months ago

作者你好,前面我已经提出了两个issue了,都是关于打开后白屏情况的,不过没解决,后续我进行了一系列自行的调整总算是成功运行项目了,然后想在这里提出一些优化的建议,让项目更利于小白应该能获得更高的stars,希望作者看到后能优化一下(能把我的代码收一下顺便给个贡献者就更好了哈哈)

1.启动的指令中的证书和密钥在README中并未提及 python server.py --port 8888 --certfile path_to_your_certfile --keyfile path_to_your_key 在看到这个指令后很多小白就不知道该怎么办了,就卡死在这步了,我是从blbl过来的看到评论区也有几个人提问了这个参数该怎么办之类的但是没人解答,解决方法: 运行:openssl req -x509 -newkey rsa:4096 -keyout keyfile.pem -out certfile.pem -days 365 -nodes 在本地生成一个keyfile.pem和一个certfile.pem,文件路径就是用于填写的参数 例如是在当前目录下生成的证书和密钥则: python server.py --port 8888 --certfile ./certfile.pem --keyfile ./keyfile.pem

2.应该有很多小伙伴跟我一样都是想要个流式ASR的实现然后找到这的,项目应该提供个最基础简单的流式ASR实现 因为没提供基础实现的话,就像关于我遇到的问题,两个脚本启动后都是空白页面显示无法连接的情况,可能就会放弃了 嗯这个问题我依旧没找到解决方案,但是我自行修改了一下代码实现了命令行中直接运行而不需要开放为api的代码,我认为这应该是最简单的实践,如果有跟我遇到一样问题的同学也可以拿这段代码尝试run一下(注意我这里修改了模型路径为本地的:model='./SenseVoiceSmall',还有声纹锁音频:"speaker/speaker_mine_converted.wav") 新建文件server_test.py,与server_wss.py平行即可,直接粘贴下面的代码:

import logging

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
from pydantic_settings import BaseSettings
from pydantic import BaseModel, Field
from funasr import AutoModel
import numpy as np
import soundfile as sf
import os
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import sounddevice as sd

class Config(BaseSettings):
    # 声纹判断置信度,高于0.2时认为是本人
    sv_thr: float = Field(0.2, description="Speaker verification threshold")
    chunk_size_ms: int = Field(100, description="Chunk size in milliseconds")
    sample_rate: int = Field(16000, description="Sample rate in Hz")
    bit_depth: int = Field(16, description="Bit depth")
    channels: int = Field(1, description="Number of audio channels")

config = Config()

emo_dict = {
    "<|HAPPY|>": "😊",
    "<|SAD|>": "😔",
    "<|ANGRY|>": "😡",
    "<|NEUTRAL|>": "",
    "<|FEARFUL|>": "😰",
    "<|DISGUSTED|>": "🤢",
    "<|SURPRISED|>": "😮",
}

event_dict = {
    "<|BGM|>": "🎼",
    "<|Speech|>": "",
    "<|Applause|>": "👏",
    "<|Laughter|>": "😀",
    "<|Cry|>": "😭",
    "<|Sneeze|>": "🤧",
    "<|Breath|>": "",
    "<|Cough|>": "🤧",
}

emoji_dict = {
    "<|nospeech|><|Event_UNK|>": "❓",
    "<|zh|>": "",
    "<|en|>": "",
    "<|yue|>": "",
    "<|ja|>": "",
    "<|ko|>": "",
    "<|nospeech|>": "",
    "<|HAPPY|>": "😊",
    "<|SAD|>": "😔",
    "<|ANGRY|>": "😡",
    "<|NEUTRAL|>": "",
    "<|BGM|>": "🎼",
    "<|Speech|>": "",
    "<|Applause|>": "👏",
    "<|Laughter|>": "😀",
    "<|FEARFUL|>": "😰",
    "<|DISGUSTED|>": "🤢",
    "<|SURPRISED|>": "😮",
    "<|Cry|>": "😭",
    "<|EMO_UNKNOWN|>": "",
    "<|Sneeze|>": "🤧",
    "<|Breath|>": "",
    "<|Cough|>": "😷",
    "<|Sing|>": "",
    "<|Speech_Noise|>": "",
    "<|withitn|>": "",
    "<|woitn|>": "",
    "<|GBG|>": "",
    "<|Event_UNK|>": "",
}

lang_dict = {
    "<|zh|>": "<|lang|>",
    "<|en|>": "<|lang|>",
    "<|yue|>": "<|lang|>",
    "<|ja|>": "<|lang|>",
    "<|ko|>": "<|lang|>",
    "<|nospeech|>": "<|lang|>",
}

emo_set = {"😊", "😔", "😡", "😰", "🤢", "😮"}
event_set = {"🎼", "👏", "😀", "😭", "🤧", "😷", }

def format_str(s):
    for sptk in emoji_dict:
        s = s.replace(sptk, emoji_dict[sptk])
    return s

def format_str_v2(s):
    sptk_dict = {}
    for sptk in emoji_dict:
        sptk_dict[sptk] = s.count(sptk)
        s = s.replace(sptk, "")
    emo = "<|NEUTRAL|>"
    for e in emo_dict:
        if sptk_dict[e] > sptk_dict[emo]:
            emo = e
    for e in event_dict:
        if sptk_dict[e] > 0:
            s = event_dict[e] + s
    s = s + emo_dict[emo]

    for emoji in emo_set.union(event_set):
        s = s.replace(" " + emoji, emoji)
        s = s.replace(emoji + " ", emoji)
    return s.strip()

def format_str_v3(s):
    def get_emo(s):
        return s[-1] if s[-1] in emo_set else None

    def get_event(s):
        return s[0] if s[0] in event_set else None

    s = s.replace("<|nospeech|><|Event_UNK|>", "❓")
    for lang in lang_dict:
        s = s.replace(lang, "<|lang|>")
    s_list = [format_str_v2(s_i).strip(" ") for s_i in s.split("<|lang|>")]
    new_s = " " + s_list[0]
    cur_ent_event = get_event(new_s)
    for i in range(1, len(s_list)):
        if len(s_list[i]) == 0:
            continue
        if get_event(s_list[i]) == cur_ent_event and get_event(s_list[i]) != None:
            s_list[i] = s_list[i][1:]
        # else:
        cur_ent_event = get_event(s_list[i])
        if get_emo(s_list[i]) != None and get_emo(s_list[i]) == get_emo(new_s):
            new_s = new_s[:-1]
        new_s += s_list[i].strip().lstrip()
    new_s = new_s.replace("The.", " ")
    return new_s.strip()

sv_pipeline = pipeline(
    task='speaker-verification',
    model='iic/speech_campplus_sv_zh_en_16k-common_advanced',
    model_revision='v1.0.0'
)

# 修改了模型路径为本地路径,你可以修改为缓存下载路径,把这个注释掉把下面的注释解开就好了
asr_pipeline = pipeline(
    task=Tasks.auto_speech_recognition,
    model='./SenseVoiceSmall',
    model_revision="master",
    device="cuda:0",
)

# asr_pipeline = pipeline(
#     task=Tasks.auto_speech_recognition,
#     model='iic/SenseVoiceSmall',
#     model_revision="master",
#     device="cuda:0",
# )

model = AutoModel(
    model="fsmn-vad",
    model_revision="v2.0.4",
    disable_pbar=True,
    max_end_silence_time=200,
    speech_noise_thres=0.8
)

# 我自己的声纹音频,你要换成你的
reg_spks_files = [
    "speaker/speaker_mine_converted.wav"
]

def reg_spk_init(files):
    reg_spk = {}
    for f in files:
        data, sr = sf.read(f, dtype="float32")
        k, _ = os.path.splitext(os.path.basename(f))
        reg_spk[k] = {
            "data": data,
            "sr": sr,
        }
    return reg_spk

reg_spks = reg_spk_init(reg_spks_files)

def process_vad_audio(audio, sv=True, lang="auto"):
    logger.debug(f"[process_vad_audio] process audio(length: {len(audio)})")
    if not sv:
        return asr_pipeline(audio, language=lang.strip())

    hit = False
    for k, v in reg_spks.items():
        res_sv = sv_pipeline([audio, v["data"]], thr=config.sv_thr)
        logger.debug(f"[speaker check] {k}: {res_sv}")
        if res_sv["score"] >= config.sv_thr:
            hit = True

    return asr_pipeline(audio, language=lang.strip()) if hit else None

def real_time_transcription():
    chunk_size = int(config.chunk_size_ms * config.sample_rate * config.channels * (config.bit_depth // 8) / 1000)

    audio_buffer = np.array([])
    audio_vad = np.array([])
    cache = {}
    last_vad_beg = last_vad_end = -1
    offset = 0

    def callback(indata, frames, time, status):
        nonlocal audio_buffer, audio_vad, cache, last_vad_beg, last_vad_end, offset

        if status:
            logger.warning(f"Warning in audio input stream: {status}")

        # 将输入音频数据添加到缓冲区
        audio_buffer = np.append(audio_buffer, indata[:, 0])

        while len(audio_buffer) >= chunk_size:
            chunk = audio_buffer[:chunk_size]
            audio_buffer = audio_buffer[chunk_size:]

            audio_vad = np.append(audio_vad, chunk)
            res = model.generate(input=chunk, cache=cache, is_final=False, chunk_size=config.chunk_size_ms)
            logger.debug(f"vad inference: {res}")
            if len(res[0]["value"]):
                vad_segments = res[0]["value"]
                for segment in vad_segments:
                    if segment[0] > -1:  # 语音开始
                        last_vad_beg = segment[0]
                    if segment[1] > -1:  # 语音结束
                        last_vad_end = segment[1]
                    if last_vad_beg > -1 and last_vad_end > -1:
                        logger.debug(f"vad segment: {[last_vad_beg, last_vad_end]}")
                        last_vad_beg -= offset
                        last_vad_end -= offset
                        offset += last_vad_end
                        beg = int(last_vad_beg * config.sample_rate / 1000)
                        end = int(last_vad_end * config.sample_rate / 1000)
                        result = process_vad_audio(audio_vad[beg:end], sv=True, lang="auto")
                        logger.debug(f"[process_vad_audio] {result}")
                        audio_vad = audio_vad[end:]
                        last_vad_beg = last_vad_end = -1

                        if result is not None:
                            print(format_str_v3(result[0]['text']))

    # 使用sounddevice库从麦克风进行实时音频捕捉
    with sd.InputStream(samplerate=config.sample_rate, channels=config.channels, callback=callback):
        print("Real-time transcription started...")
        input()  # 按下任意键结束程序

# 运行实时转录函数
if __name__ == "__main__":
    real_time_transcription()

其中声纹置信度可在:sv_thr: float = Field(0.2, description="Speaker verification threshold")进行修改,我当前是0.2

3.关于声纹锁音频 该音频一定需要是16K的采样率,不可以自己随便录一个音就拿来当音频了,否则就会报错形状对不上 如果有小白同学不知道该怎么办的则可以用这段代码转换一下采样率,最后使用转换后的音频文件作为声纹音频即可

from pydub import AudioSegment

# 读取音频文件
audio = AudioSegment.from_file("./speaker/speaker_mine.mp3")

# 转换为单声道并调整采样率
audio = audio.set_channels(1)
audio = audio.set_frame_rate(16000)  # 如果需要将采样率调整为 16kHz

# 保存转换后的音频文件
audio.export("./speaker/speaker_mine_converted.wav", format="wav")
summerHearts commented 2 months ago

使用iic/SenseVoiceSmall 流式识别,错字率太高了,不适合商用的。