ant-design / pro-chat

🤖 Components Library for Quickly Building LLM Chat Interfaces.
https://pro-chat.antdigital.dev
MIT License
649 stars 78 forks source link

🧐[问题] 接收时chunk 出现粘包或半包 #187

Closed LillyChen closed 3 months ago

LillyChen commented 4 months ago

🧐 问题描述

接收时chunk出现粘包或半包怎么处理

粘包:chunk data: {"event":"result","content":"要准确理解"} data: {"event":"result","content":"这个问题"} 半包:chunk data: {"event":"result

💻 示例代码

     const theme = useTheme();
     return (
      <div style={{ background: theme.colorBgLayout }}>
      <ProChat
       request={async (messages: any) => {
        // 正常业务中如下:
        const response = await fetch('url', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json;charset=UTF-8',
          },
          body: JSON.stringify({
            query: messages[messages.length-1].content,
            stream: true,
          }),
        });

        // 确保服务器响应是成功的
        if (!response.ok || !response.body) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }

        // 获取 reader
        const reader = response.body.getReader();
        const decoder = new TextDecoder('utf-8');
        const encoder = new TextEncoder();

        const readableStream = new ReadableStream({
          async start(controller) {
            function push() {
              reader
                .read()
                .then(({ done, value }) => {
                  if (done) {
                    controller.close();
                    return;
                  }

                  const chunk = decoder.decode(value, { stream: true });

                  console.log('chunk', chunk);

                  const chunkSplit = chunk.split('\n\n');

                  for (let i = 0; i < chunkSplit.length; i++) {
                    const chunk = chunkSplit[i];

                    if(chunk){
                      const message = chunk.replace('data: ', '');

                      const parsed = JSON.parse(message);
                      console.log(parsed)

                      const event = parsed.event
                      if(event==='done'){
                        controller.close();
                        return;
                      }

                      const content = parsed.content
                      console.log(content)
                      controller.enqueue(encoder.encode(content));
                    }
                  }
                  push();
                })
                .catch((err) => {
                  console.error('读取流中的数据时发生错误', err);
                  controller.error(err);
                });
            }
            push();
          },
       });
       return new Response(readableStream);
     }}
     />
     </div>
     );
     };

🚑 其他信息

f7a2f8d2167e440ffa2917ad06e884e5

ONLY-yours commented 4 months ago

一般是让后台处理的,python 的话应该可以用 socket 解决?

LillyChen commented 4 months ago

一般是让后台处理的,python 的话应该可以用 socket 解决?

看着有些解决方案是:消息头中包含消息体的长度信息,接收方(前端)根据消息头中的长度信息来正确解析出消息体的内容。这样来解决粘包和流式半包的问题。这样是不是前端要用一个buffer缓冲流先存储一下,看接收完了再输出。

ONLY-yours commented 4 months ago

一般是让后台处理的,python 的话应该可以用 socket 解决?

看着有些解决方案是:消息头中包含消息体的长度信息,接收方(前端)根据消息头中的长度信息来正确解析出消息体的内容。这样来解决粘包和流式半包的问题。这样是不是前端要用一个buffer缓冲流先存储一下,看接收完了再输出。

逻辑上是可以的,但是比较麻烦,到时候判定规则或者前后拼接的逻辑得自己写好就也可以

ONLY-yours commented 3 months ago

先关闭了,看起来已经又一些解法了

XPStone commented 1 month ago

用Buffer解决的粘包,我这边可以正常处理: const readableStream = new ReadableStream({ async start(controller) { let buffer = ''; // 缓冲区用于存储未处理完的消息

                            async function push() {
                                try {
                                    // 拉取响应流中的标志位done与二进制的数据Value
                                    const { done, value } = await reader.read();
                                    console.log("VALUE",value);
                                    if (done) {
                                        if (buffer.length > 0) { // 处理剩余的缓冲区内容
                                            handleChunk(buffer, controller, encoder);
                                        }
                                        controller.close(); // 关闭流
                                        return;
                                    }

                                    buffer += decoder.decode(value, { stream: true });

                                    let boundary = buffer.indexOf('\n'); // 查找缓冲区中的换行符
                                    //循环查找换查找缓冲区的换行符
                                    while (boundary !== -1) {
                                        // 提取一份完整的消息
                                        const completeMessage = buffer.slice(0, boundary).trim();
                                        buffer = buffer.slice(boundary + 1); // 更新缓冲区
                                        // 处理以'data: '开头的消息
                                        if (completeMessage.startsWith('data: ')) {
                                            let completeMessageTemp = completeMessage.replace('data: ', '').trim();
                                            if (completeMessageTemp === '[DONE]') {
                                                controller.close();
                                                return;
                                            }
                                            console.log('This is Message', completeMessageTemp);                                                
                                            handleChunk(completeMessageTemp, controller, encoder);
                                        }
                                        boundary = buffer.indexOf('\n'); // 查找下一个换行符
                                    }

                                    push();
                                } catch (err) {
                                    console.error('读取流中的数据时发生错误', err);
                                    controller.error(err); 
                                }
                            }
                            push(); 
                        }
                    });

                    // 处理消息块
                    function handleChunk(chunk: string, controller: ReadableStreamDefaultController<any>, encoder: TextEncoder) {
                        try {
                            if (!chunk) {
                               throw new Error("No valid JSON found in the message");
                            }
                            const parsed = JSON.parse(chunk); 

                            if (parsed.choices && parsed.choices[0] && parsed.choices[0].delta) {
                                controller.enqueue(encoder.encode(parsed.choices[0].delta.content)); 
                            }
                        } catch (err) {
                            console.error('处理chunk时发生错误', err);
                            controller.error(err);
                        }
                    }