k2-fsa / sherpa-onnx

Speech-to-text, text-to-speech, speaker diarization, and VAD using next-gen Kaldi with onnxruntime without Internet connection. Support embedded systems, Android, iOS, Raspberry Pi, RISC-V, x86_64 servers, websocket server/client, C/C++, Python, Kotlin, C#, Go, NodeJS, Java, Swift, Dart, JavaScript, Flutter, Object Pascal, Lazarus, Rust
https://k2-fsa.github.io/sherpa/onnx/index.html
Apache License 2.0
3.67k stars 427 forks source link

get connection closed error when sending long file to Streaming Server from online_websocket_client_decode_file #1363

Open Caet-pip opened 2 months ago

Caet-pip commented 2 months ago

Hello I get the following error when sending a long file >50 mins. File format is 16k sampling rate.

File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1289, in close_connection await self.transfer_data_task File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 955, in transfer_data message = await self.read_message() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1025, in read_message frame = await self.read_data_frame(max_size=self.max_size) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1113, in read_data_frame await self.write_close_frame(self.close_rcvd, frame.data) File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1226, in write_close_frame await self.write_frame(True, OP_CLOSE, data, _state=State.CLOSING) File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1201, in write_frame await self.drain() File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1182, in drain async with self._drain_lock: File "/usr/lib/python3.12/asyncio/locks.py", line 14, in aenter await self.acquire() File "/usr/lib/python3.12/asyncio/locks.py", line 113, in acquire await fut asyncio.exceptions.CancelledError

async for message in socket: File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 494, in aiter yield await self.recv() ^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 564, in recv await self.ensure_open() File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 940, in ensure_open raise self.connection_closed_exc()

csukuangfj commented 2 months ago

could you post complete.logs and describe how to.reproduce it?

Caet-pip commented 2 months ago

Sure,

the audio file I am sending is from a public repo: https://github.com/revdotcom/speech-datasets/blob/main/earnings21/media/4320211.mp3

I converted it to .wav format with 16k sampling rate using ffmpeg

I am sending a 55 plus minute audio file using this command:

python3 online-websocket-client-decode-file.py --server-addr localhost --server-port 6005 --seconds-per-message 0.1 --samples-per-message 16000 ./test_audio/4320211.wav

I start the server using this command:

python3 streming_server_tests.py --tokens=./sherpa-onnx-streaming-zipformer-en-2023-06-21/tokens.txt --encoder=./sherpa-onnx-streaming-zipformer-en-2023-06-21/encoder-epoch-99-avg-1.onnx --decoder=./sherpa-onnx-streaming-zipformer-en-2023-06-21/decoder-epoch-99-avg-1.onnx --joiner=./sherpa-onnx-streaming-zipformer-en-2023-06-21/joiner-epoch-99-avg-1.onnx --port=6005

This is the error log: Traceback (most recent call last): File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1289, in close_connection await self.transfer_data_task File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 955, in transfer_data message = await self.read_message() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1025, in read_message frame = await self.read_data_frame(max_size=self.max_size) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1100, in read_data_frame frame = await self.read_frame(max_size) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1157, in read_frame frame = await Frame.read( ^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/framing.py", line 68, in read data = await reader(2) ^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/streams.py", line 752, in readexactly await self._wait_for_data('readexactly') File "/usr/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data await self._waiter asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/home/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 196, in asyncio.run(main()) File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 182, in main await run( File "/home/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 159, in run await websocket.send(d) File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 630, in send await self.ensure_open() File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 931, in ensure_open raise self.connection_closed_exc() websockets.exceptions.ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received 2024-09-20 18:52:49,805 ERROR [base_events.py:1821] Task exception was never retrieved future: <Task finished name='Task-5' coro=<receive_results() done, defined at /home/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py:118> exception=ConnectionClosedError(None, Close(code=<CloseCode.INTERNAL_ERROR: 1011>, reason='keepalive ping timeout'), None)> Traceback (most recent call last): File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1289, in close_connection await self.transfer_data_task File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 955, in transfer_data message = await self.read_message() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1025, in read_message frame = await self.read_data_frame(max_size=self.max_size) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1100, in read_data_frame frame = await self.read_frame(max_size) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1157, in read_frame frame = await Frame.read( ^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/framing.py", line 68, in read data = await reader(2) ^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/streams.py", line 752, in readexactly await self._wait_for_data('readexactly') File "/usr/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data await self._waiter asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/home/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 120, in receive_results async for message in socket: File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 494, in aiter yield await self.recv() ^^^^^^^^^^^^^^^^^ File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 564, in recv await self.ensure_open() File "/home/sherpa-onnx/python-api-examples/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 940, in ensure_open raise self.connection_closed_exc() websockets.exceptions.ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received

csukuangfj commented 2 months ago

I start the server using this command:

python3 streming_server_tests.py

What is inside streming_server_tests.py?

Caet-pip commented 2 months ago

It is the same as streaming_server.py. I had to change the name as I had multiple files. I tried again with the streaming_server.py file from GitHub, and get same error in the online wbesocket client decode file logs:

Traceback (most recent call last): File "/home/azureuser/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 196, in asyncio.run(main()) File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/home/azureuser/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 182, in main await run( File "/home/azureuser/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 159, in run await websocket.send(d) File "/home/azureuser/sherpa-onnx/python-api-examples/asrenv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 630, in send await self.ensure_open() File "/home/azureuser/sherpa-onnx/python-api-examples/asrenv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 931, in ensure_open raise self.connection_closed_exc() websockets.exceptions.ConnectionClosedError: received 1011 (internal error) keepalive ping timeout; then sent 1011 (internal error) keepalive ping timeout 2024-09-23 11:47:57,270 ERROR [base_events.py:1821] Task exception was never retrieved future: <Task finished name='Task-5' coro=<receive_results() done, defined at /home/azureuser/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py:118> exception=ConnectionClosedError(Close(code=1011, reason='keepalive ping timeout'), Close(code=1011, reason='keepalive ping timeout'), True)> Traceback (most recent call last): File "/home/azureuser/sherpa-onnx/python-api-examples/online-websocket-client-decode-file.py", line 120, in receive_results async for message in socket: File "/home/azureuser/sherpa-onnx/python-api-examples/asrenv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 494, in aiter yield await self.recv() ^^^^^^^^^^^^^^^^^ File "/home/azureuser/sherpa-onnx/python-api-examples/asrenv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 564, in recv await self.ensure_open() File "/home/azureuser/sherpa-onnx/python-api-examples/asrenv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 931, in ensure_open raise self.connection_closed_exc() websockets.exceptions.ConnectionClosedError: received 1011 (internal error) keepalive ping timeout; then sent 1011 (internal error) keepalive ping timeout

csukuangfj commented 2 months ago

Could you add some log messages in your client code and try to find how many samples it has sent before it throws?

Caet-pip commented 2 months ago

I printed logs in streaming server to see how many samples are being received

stream.accept_waveform(sample_rate=self.sample_rate, waveform=samples) print('length of samples received is', len(samples))

Logs show: length of samples received is 16000

I printed logs in online websocket client decode file in the run function

    while start < data.shape[0]:
        end = start + samples_per_message
        end = min(end, data.shape[0])
        d = data.data[start:end].tobytes()
        print("length of sent data", len(d))

Logs show: length of sent data 64000

even before the error the logs show the same in both client and server