aliyun / alibabacloud-bailian-speech-demo

Sample Repository for the AlibabaCloud Bailian Speech SDK
https://help.aliyun.com/zh/dashscope/developer-reference/speech-understanding-and-synthesis
MIT License
25 stars 1 forks source link

语音合成流式输出如何在本地转发 #13

Closed EvilCalf closed 3 weeks ago

EvilCalf commented 1 month ago

update ui Listening... Recognition open click to start asr event_to_stop has been notified and go to stop... libc++abi: terminating Abort trap: 6

songguocola commented 1 month ago

请提供你的运行环境。

EvilCalf commented 1 month ago

请提供你的运行环境。

是pyaudio的问题,这个不能用conda安装。 另外麻烦一下,CosyVoice 流式api的调用,我看dashscope的接口文档,都是由callback中返回,我希望每次返回的文本处理后,再通过websocket流式输出,想麻烦请教一下这个如何做。

songguocola commented 1 month ago

请提供你的运行环境。

是pyaudio的问题,这个不能用conda安装。 另外麻烦一下,CosyVoice 流式api的调用,我看dashscope的接口文档,都是由callback中返回,我希望每次返回的文本处理后,再通过websocket流式输出,想麻烦请教一下这个如何做。

你应该可以参考这个:https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/samples/speech-plus/ai-assistant/demo_server.py

EvilCalf commented 1 month ago

感谢,目前我参考写了fastapi websocket的样式,但是有问题,希望大佬帮忙看看

import asyncio
import traceback
from queue import Queue

import dashscope
from dashscope.audio.tts_v2 import *
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

dashscope.api_key = ""
model = "cosyvoice-v1"
voice = "longxiaochun"

# 管理WebSocket连接
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

manager = ConnectionManager()

class WSCallback(ResultCallback):
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket

    async def _send_data(self, data: bytes):
        if self.websocket.open:
            await self.websocket.send_bytes(data)
        else:
            print("WebSocket is closed, cannot send data")

    def on_open(self):
        print("websocket is open.")

    def on_complete(self):
        print("speech synthesis task complete successfully.")

    def on_error(self, message: str):
        print(f"speech synthesis task failed, {message}")

    def on_close(self):
        print("websocket is closed.")

    def on_event(self, message):
        print(f"recv speech synthsis message {message}")

    def on_data(self, data: bytes) -> None:
        # send data to websocket
        print("on_data recv speech synthsis data {}".format(len(data)))
        asyncio.run(self.websocket.send_bytes(data))
        print("on_data done")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        synthesizer_callback = WSCallback(websocket=websocket)
        synthesizer = SpeechSynthesizer(
            model="cosyvoice-v1",
            voice="longxiaochun",
            callback=synthesizer_callback,
        )
        while True:
            data = await websocket.receive_text()
            print(data)
            synthesizer.streaming_call(data)
    except WebSocketDisconnect:
        pass
    except Exception as e:
        print(f"Error: {e}")
        # print(traceback.format_exc())
    finally:
        synthesizer.streaming_complete()
        manager.disconnect(websocket)

会出现下列错误

INFO:     ('127.0.0.1', 57488) - "WebSocket /ws" [accepted]
INFO:     connection open
流式文本语音合成SDK,
websocket is open.
on_data recv speech synthsis data 1716
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 2926
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 2926
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 2926
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 2925
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 2926
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
on_data recv speech synthsis data 836
on_data done
recv speech synthsis message {"header":{"task_id":"edcd3d76138c4e18861eb01c1be8b4d4","event":"result-generated","attributes":{}},"payload":{"output":{"sentence":{"words":[]}},"usage":null}}
可以将输入的文本
合成为语音二进制数据
on_data recv speech synthsis data 2926
websocket closed due to Task <Task pending name='Task-22' coro=<WebSocket.send_bytes() running at /data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/websockets.py:191> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending cb=[shield.<locals>._outer_done_callback() at /data/xujd/miniconda3/envs/audio/lib/python3.8/asyncio/tasks.py:902]> attached to a different loop
websocket closed due to websocket closed due to Task <Task pending name='Task-22' coro=<WebSocket.send_bytes() running at /data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/websockets.py:191> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending cb=[shield.<locals>._outer_done_callback() at /data/xujd/miniconda3/envs/audio/lib/python3.8/asyncio/tasks.py:902]> attached to a different loop
websocket closed due to websocket closed due to websocket closed due to Task <Task pending name='Task-22' coro=<WebSocket.send_bytes() running at /data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/websockets.py:191> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending cb=[shield.<locals>._outer_done_callback() at /data/xujd/miniconda3/envs/audio/lib/python3.8/asyncio/tasks.py:902]> attached to a different loop
INFO:     connection closed
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 244, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)  # type: ignore[func-returns-value]
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
    return await self.app(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/middleware/errors.py", line 151, in __call__
    await self.app(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/routing.py", line 756, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/routing.py", line 776, in app
    await route.handle(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/routing.py", line 373, in handle
    await self.app(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/routing.py", line 96, in app
    await wrap_app_handling_exceptions(app, session)(scope, receive, send)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/starlette/routing.py", line 94, in app
    await func(session)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/fastapi/routing.py", line 348, in app
    await dependant.call(**values)
  File "/data/xujd/CosyVoice/app.py", line 87, in websocket_endpoint
    synthesizer.streaming_complete()
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/dashscope/audio/tts_v2/speech_synthesizer.py", line 365, in streaming_complete
    self.__send_str(request)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/dashscope/audio/tts_v2/speech_synthesizer.py", line 281, in __send_str
    self.ws.send(data)
  File "/data/xujd/miniconda3/envs/audio/lib/python3.8/site-packages/websocket/_app.py", line 291, in send
    raise WebSocketConnectionClosedException("Connection is already closed.")
websocket._exceptions.WebSocketConnectionClosedException: Connection is already closed.
lengjiayi commented 1 month ago

你好,由于callback和调用API接口是在两个线程,因此如果使用python的asyncio task会出现任务提交到不同的event loop导致报错(尤其是在windows系统中)。

如果希望将流式音频重新实时转发,您可以参考新的示例工程: https://github.com/aliyun/alibabacloud-bailian-speech-demo/tree/develop/v1.0.0/samples/speech-plus/ai-assistant

EvilCalf commented 1 month ago

你好,由于callback和调用API接口是在两个线程,因此如果使用python的asyncio task会出现任务提交到不同的event loop导致报错(尤其是在windows系统中)。

如果希望将流式音频重新实时转发,您可以参考新的示例工程: https://github.com/aliyun/alibabacloud-bailian-speech-demo/tree/develop/v1.0.0/samples/speech-plus/ai-assistant

感谢大佬们,不过我还是没跑通。。。。 现在这是我的服务器端

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            data = json.loads(data)
            print(data)
            taskQueue = TaskQueue()

            taskQueue.put_text(data["text"], data["is_last"])

            synthesizer_callback = Callback(taskQueue)
            synthesizer = SpeechSynthesizer(
                model="cosyvoice-v1",
                voice="longxiaochun",
                callback=synthesizer_callback,
            )

            while True:
                cur_audio_queue, cur_text_queue = taskQueue.get_cur_queue()
                task_end = False
                cur_audio = b""
                for task in cur_audio_queue:
                    if task[1] == False:
                        cur_audio += task[0]
                    else:
                        task_end = True
                await websocket.send_bytes(cur_audio)
                for task in cur_text_queue:
                    print("task: ", task)
                    if task[1] == False:
                        synthesizer.streaming_call(task[0])
                    else:
                        synthesizer.async_streaming_complete()
                if task_end:
                    break

    except WebSocketDisconnect:
        pass
    except Exception as e:
        print(f"Error: {e}")
        print(traceback.format_exc())
    finally:
        manager.disconnect(websocket)

输出内容

INFO:     ('127.0.0.1', 36944) - "WebSocket /ws" [accepted]
INFO:     connection open
{'text': '流式文本语音合成SDK,', 'is_last': False}
task:  ['流式文本语音合成SDK,', False]
websocket is open.
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
on audio
INFO:     connection closed
speech synthesis task failed, {"header":{"task_id":"6f61b248c7aa4b06bbcffddd6dc3e3a0","event":"task-failed","error_code":"InvalidParameter","error_message":"request timeout after 23 seconds.","attributes":{}},"payload":{}}
websocket is closed.
EvilCalf commented 1 month ago

callback调用了on data但是实际并没有放值进去

lengjiayi commented 1 month ago

看日志的报错显示,您在发送文本之后没有调用async_streaming_complete导致了服务端超时中断连接,并报错:“request timeout after 23 seconds”。请您先在本地尝试运行ai-assistant示例工程,如果能够成功运行再尝试进行后续的修改。

附链接:https://github.com/aliyun/alibabacloud-bailian-speech-demo/tree/develop/v1.0.0/samples/speech-plus/ai-assistant

lengjiayi commented 1 month ago

请问是interface.html运行会出问题吗?可以贴一下具体的报错或者加入我们的开发者交流群

EvilCalf commented 1 month ago

请问是interface.html运行会出问题吗?可以贴一下具体的报错或者加入我们的开发者交流群

demo运行没有问题,但是我修改后仍然有问题。我具体看到一个message处理完后,cur_text_queue和cur_audio_queue都为空。callback没有返回on_complete,没有最后一个task 完成true标记导致一直有问题。

EvilCalf commented 1 month ago
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        async for message in websocket.iter_text():
            print(message)
            data = json.loads(message)
            print(data)
            taskQueue = TaskQueue()

            taskQueue.put_text(data["text"], data["is_last"])

            synthesizer_callback = Callback(taskQueue)
            synthesizer = SpeechSynthesizer(
                model="cosyvoice-v1",
                voice="longxiaochun",
                callback=synthesizer_callback,
            )

            while True:
                time.sleep(0.01)
                cur_audio_queue, cur_text_queue = taskQueue.get_cur_queue()
                task_end = False
                cur_audio = b""
                for task in cur_audio_queue:
                    if task[1] == False:
                        cur_audio += task[0]
                    else:
                        task_end = True
                if cur_audio != b"":
                    await websocket.send_bytes(cur_audio)
                for task in cur_text_queue:
                    if task[1] == False:
                        synthesizer.streaming_call(task[0])
                    else:
                        synthesizer.async_streaming_complete()
                if task_end:
                    break

    except Exception as e:
        print(f"Error: {e}")
lengjiayi commented 1 month ago

Hi你好,根据您贴出的代码和对之前task_id: "6f61b248c7aa4b06bbcffddd6dc3e3a0" 日志的排查,没有触发on_complete是由于在while循环中,cur_text_queue中没有task导致async_streaming_complete函数没有被调用。

在您提供的代码中:除了taskQueue.put_text(data["text"], data["is_last"])没有提供其他写入taskQueue的代码。 1、如果只有这一处写入taskQueue,实际上会导致websocket收到的每一个消息都重新创建taskQueue,并且只写入当前消息,因此taskQueue将只包含一个task,并且看起来应该是文本,即task[1]==False。请参考demo工程将流式输入的文本在其他线程中写入taskQueue。 2、如果您还在其他地方参考了demo工程异步写入了taskQueue请贴出相关代码参考。

EvilCalf commented 1 month ago

感谢您的帮助,我已经调整了代码,可以完成接收数据。

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    taskQueue = TaskQueue()
    # Call the speech synthesizer callback
    synthesizer_callback = Callback(taskQueue)

    # Synthesize speech with llm streaming output text, sync call and playback of MP3 audio streams.
    synthesizer = SpeechSynthesizer(
        model="cosyvoice-v1",
        voice="longxiaochun",
        callback=synthesizer_callback,
        format=AudioFormat.PCM_22050HZ_MONO_16BIT,
    )

    async def process_received_data():
        while True:
            cur_audio_queue, cur_text_queue = taskQueue.get_cur_queue()
            task_end = False
            cur_audio = b""
            for task in cur_audio_queue:
                if task[1] == False:
                    cur_audio += task[0]
                else:
                    task_end = True
            await websocket.send_json(
                {"format": "WAV", "sample_rate": 22050, "channels": 1, "bit_depth": 16}
            )
            await websocket.send_bytes(cur_audio)
            for task in cur_text_queue:
                if task[1] == False:
                    synthesizer.streaming_call(task[0])
                    await websocket.send_text(task[0])
                else:
                    synthesizer.async_streaming_complete()
            if task_end:
                break

    try:
        processing_task = asyncio.create_task(process_received_data())
        while True:
            data = await websocket.receive_text()
            print("recv: " + data)
            data = json.loads(data)
            taskQueue.put_text(data["text"], data["is_last"])

            # 检查是否所有任务都已完成
            if data["is_last"]:
                # 等待处理任务完成
                await processing_task
                break

    except Exception as e:
        print(e)
    finally:
        # 关闭 WebSocket 连接
        await websocket.close()

但是我目前这样转发出来的bytes我使用如下代码播放有问题,全是电音嘈杂的声音:

p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=22050, output=True)
response = await websocket.recv()
stream.write(response)
lengjiayi commented 1 month ago

可以先将收到的音频按照追加模式保存到wav文件中,查看收到的数据是否正确。 另外看起来你的websocket本地服务也会转发文本,需要做一下区分。 最后客户端推荐使用websocket-client,可以避免协程await带来的其他异步问题。