midwayjs / midway

🍔 A Node.js Serverless Framework for front-end/full-stack developers. Build the application for next decade. Works on AWS, Alibaba Cloud, Tencent Cloud and traditional VM/Container. Super easy integrate with React and Vue. 🌈
https://www.midwayjs.org/
MIT License
7.34k stars 573 forks source link

功能请求:希望标准项目可以实现流式响应 #3282

Open quantstu opened 11 months ago

quantstu commented 11 months ago

描述

我们希望在标准项目中实现流式响应的功能,以便满足特定应用场景的需求。流式响应允许服务器持续向客户端发送数据,而不需要客户端不断地进行轮询请求,这在某些实时通信和事件推送应用中非常有用。

背景

最近,我们的项目需要与大型语言模型进行交互,以获取实时生成的文本数据。这种交互需要使用 Server-Sent Events (SSE) 协议,其中 content-type 设置为 text/event-stream。在我们的项目中,我们选择使用 Koa 来实现 SSE,具体实现如下:

// 发送消息
const sendMessage = async (stream) => {
  const data = [
    '现在科学技术的发展速度叫人惊叹',
    '同样在数码相机的技术创新上',
    '随着数码相机越来越普及',
    '数码相机现已成为大家生活中不可缺少的电子产品',
    '而正是因为这样,技术的创新也显得尤为重要',
  ];

  // 循环上面数组: 推送数据、休眠 2 秒
  for (const value of data) {
    stream.write(`data: ${value}\n\n`); // 写入数据(推送数据)
    await new Promise((resolve) => setTimeout(resolve, 2000));
  }

  // 结束流
  stream.end();
};

router.get('/demo', async (ctx) => {
  // 1. 设置响应头
  ctx.set({
    'Connection': 'keep-alive',
    'Cache-Control': 'no-cache',
    'Content-Type': 'text/event-stream', // 表示返回数据是个 stream
  });

  // 2. 创建流、并作为接口数据进行返回
  const stream = new PassThrough();
  ctx.body = stream;
  ctx.status = 200;

  // 3. 推送流数据
  sendMessage(stream, ctx);
});

请求

为了实现流式响应的功能,我们建议在标准项目中提供相应的支持和工具,以便开发人员可以更轻松地创建类似的 SSE 端点。这将使标准项目更加灵活,并能够满足更多实时应用的需求。

预期行为

我们期望标准项目中的流式响应功能包括以下要点:

  1. 能够设置响应头,包括必要的 SSE 头部信息(如 Connection、Cache-Control、Content-Type)。
  2. 能够创建一个可写流对象,作为响应的主体。
  3. 能够轻松地向流中写入数据,以实现数据的持续推送。
  4. 能够在需要时结束流,以通知客户端数据传输的结束。

这些功能将使开发人员能够更方便地实现 SSE 以及其他流式响应协议,从而增强项目的实时通信能力。

相关文档

(以上部分内容由AI协助生成)

czy88840616 commented 11 months ago

直接引用 koa-sse 即可。

quantstu commented 11 months ago

在Next.js中可以直接将ReadableStream作为一个响应,我想在Midway中实现这样的功能好像不行

Next.js

// app/api/sse/route.ts
export async function GET() {
  const encoder = new TextEncoder()
  const stream = new ReadableStream({
    async start(controller) {
      const data = [
        '渐进式设计',
        '提供从基础到入门再到企业级的升级方案',
        '解决应用维护与拓展性难题',
      ]
      while (data.length) {
        const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
        console.log(chunk)
        controller.enqueue(encoder.encode(chunk))
        await new Promise(resolve => setTimeout(resolve, 100))
      }
      controller.close()
    },
  })
  return new Response(stream)
}

Response Raw

data: {"content":"渐进式设计"}

data: {"content":"提供从基础到入门再到企业级的升级方案"}

data: {"content":"解决应用维护与拓展性难题"}

Midway

// src/controller/sse.controller.ts
import { Controller, Get, Inject } from '@midwayjs/core'
import type { Context } from '@midwayjs/koa'

@Controller('/api')
export class APIController {
  @Inject()
  ctx: Context

  @Get('/sse')
  async createChatCompletion() {
    const encoder = new TextEncoder()
    const stream = new ReadableStream({
      async start(controller) {
        const data = [
          '渐进式设计',
          '提供从基础到入门再到企业级的升级方案',
          '解决应用维护与拓展性难题',
        ]
        while (data.length) {
          const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
          console.log(chunk)
          controller.enqueue(encoder.encode(chunk))
          await new Promise(resolve => setTimeout(resolve, 100))
        }
        controller.close()
      },
    })
    return stream
  }
}

Response Raw

{}

Logs

[16:19:48] Node.js server restarted in 1468 ms

data: {"content":"渐进式设计"}

2023-09-25 16:20:02,889 INFO 3312 [-/::1/-/214ms GET /api/sse] Report in "src/middleware/report.middleware.ts", rt = 4ms      
data: {"content":"提供从基础到入门再到企业级的升级方案"}

data: {"content":"解决应用维护与拓展性难题"}
leemotive commented 11 months ago

我们用的 Readable 在 midway 中实现流式响应


const read = new Readable();  // node 内部 stream 模块
read._read = () => {};
read.push(streamData.content);   // 异常调用返回一次数据
read.push(null);  // 结束的时候 push 一个 null

return read;  // 以 read 对象作为响应
tmhulw commented 11 months ago

我们用的 Readable 在 midway 中实现流式响应

const read = new Readable();  // node 内部 stream 模块
read._read = () => {};
read.push(streamData.content);   // 异常调用返回一次数据
read.push(null);  // 结束的时候 push 一个 null

return read;  // 以 read 对象作为响应

大兄弟,贴个完整代码参考下

czy88840616 commented 11 months ago

标准项目一直都是可以流式响应的。

ctx.res.send(xxx) / ctx.res.end() 即可。

czy88840616 commented 10 months ago

这个要具体看sse这个模块里面的逻辑了,你这么引用,所有的请求是都会过它的。

pionear-layen commented 9 months ago

有完整案例看看吗,大佬们

WillieChan2015 commented 9 months ago

+1,有无完整例子可以参考看看

czy88840616 commented 9 months ago

我刚好写了一个。。。

 @Get('/')
  async home(): Promise<any> {
    this.ctx.status = 200;
    this.ctx.set('Transfer-Encoding', 'chunked');
    for (let i = 0; i < 100; i++) {
      await sleep(100);
      this.ctx.res.write('abc'.repeat(100));
    }

    this.ctx.res.end();
  }
pionear-layen commented 9 months ago

这个也可以https://github.com/JarvisPrestidge/koa-event-stream.git

hjane commented 6 months ago

有一个方法可以实现。 我在尝试中安装了 koa-sse-stream 因为它不是标准的 midwayjs 组件,所以我在configuration.ts中直接配置无效 @Configuration({ imports: [ koa ]})

如果用 this.app.use(sse({ maxClients: 5000, pingInterval: 30000 })); 确实会生效,效果非常完美,但是我其他的路由都被影响了。

我绕了一圈找现成可用的方式,最后还是回到这个有过一点成果的尝试上。 这个插件封装的很简洁,示例写的其实很完整

image

参考插件代码,我实现了我要的效果

首先把sse.js 拿出来,当作一个服务文件

image

可以看到我几乎是复制过来的

然后写一个StreamMiddleware中间件 只有以/stream开头的路由会执行这个中间件

import { Middleware, IMiddleware } from '@midwayjs/core';
import { NextFunction, Context } from '@midwayjs/koa';
import { SSETransform } from '../service/sse.service';
// const Stream = require('stream');
const DEFAULT_OPTS = {
  maxClients: 10000,
  pingInterval: 60000,
  closeEvent: 'close',
};
@Middleware()
export class StreamMiddleware implements IMiddleware<Context, NextFunction> {
  resolve() {
    return async (ctx: Context, next: NextFunction) => {
      // 控制器前执行的逻辑
      // const startTime = Date.now();
      // 执行下一个 Web 中间件,最后执行到控制器
      // 这里可以拿到下一个中间件或者控制器的返回值
      if (ctx.res.headersSent) {
        if (!(ctx.sse instanceof SSETransform)) {
          console.error(
            'SSE response header has been send, Unable to create the sse response'
          );
        }
        return await next();
      }
      const sse = new SSETransform(ctx, DEFAULT_OPTS);
      sse.on('close', () => {
        console.log('close');
      });
      ctx.sse = sse;
      await next();
      if (ctx.sse) {
        if (!ctx.body) {
          ctx.body = ctx.sse;
        } else {
          if (!ctx.sse.ended) {
            ctx.sse.send(ctx.body);
          }
          ctx.body = sse;
        }
      }
    };
  }

  match(ctx) {
    return ctx.path.indexOf('/stream/') !== -1;
  }
}

然后在configuration.ts文件中应用中间件 import { StreamMiddleware } from './middleware/stream.middleware'; this.app.useMiddleware([StreamMiddleware, ReportMiddleware]);

最后

image

以上就是我实现的全部流程。

===============

捂脸 不用以上这么麻烦,我没仔细看api 只在路由上加载中间件就可以了

image
sy05514 commented 4 months ago

我是这样可以实现
@Get('/index') async index(): Promise { this.ctx.status = 200; this.ctx.set('Content-Type', 'text/event-stream'); this.ctx.set('Cache-Control', 'no-cache, no-transform'); this.ctx.set('Connection', 'keep-alive'); this.ctx.set('X-Accel-Buffering', 'no'); this.ctx.res.write('data: {"status":200}' + '\n\n'); // 创建一个不会被解决的 Promise,防止 Koa 自动结束响应 return new Promise((resolve, reject) => { // 设置一个定时器来定期发送数据到客户端 const intervalId = setInterval(() => { this.ctx.res.write('data: {"message":"Heartbeat"}\n\n'); }, 60000); // 每60秒发送一次 // 监听连接关闭事件,清理资源 this.ctx.req.on('close', () => { clearInterval(intervalId); resolve(1); }); this.ctx.req.on('error', () => { clearInterval(intervalId); reject(new Error('SSE connection error')); }); }); }

hjane commented 4 months ago

这是来自QQ邮箱的自动回复邮件。您好,您的邮件已经收到,我会在一周内尽快给您回复!

gooin commented 1 month ago

我在用langgraph,一开始参照上面各位大佬的方案,直接写一个中间件调用 https://github.dev/JarvisPrestidge/koa-event-stream/ 的sse.ts 但是有问题,graph.streamEvents接收的参数结束完了才会统一发event,流不起来。最后结合上面各位大佬的方法和上面的repo综合了一下,搞定了,希望能给后面遇到坑的小伙伴们参考。

stream.middlewate.ts

import { IMiddleware } from '@midwayjs/core';
import { Middleware } from '@midwayjs/decorator';
import { Context, NextFunction } from '@midwayjs/koa';

class SSE {
    ctx: Context
    constructor(ctx: Context) {
        ctx.status = 200;
        ctx.set('Content-Type', 'text/event-stream');
        ctx.set('Cache-Control', 'no-cache');
        ctx.set('Connection', 'keep-alive');
        ctx.res.writeHead(200, {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Transfer-Encoding': 'chunked'
        });
        ctx.res.flushHeaders();
        this.ctx = ctx;
    }
    send(data: any) {
        // string
        if (typeof data === "string") {
            this.push(`data: ${data}\n\n`);
        }
        // object data
        if (data.id) {
            this.push(`id: ${data.id}\n`);
        }
        if (data.event) {
            this.push(`event: ${data.event}\n`);
        }
        const text = typeof data.data === "object"
            ? JSON.stringify(data.data)
            : data.data;
        this.push(`data: ${text}\n\n`);
    }
    push(data: any) {
        this.ctx.res.write(data);
        this.ctx.res.flushHeaders();
    }
    close() {
        this.ctx.res.end();
    }
}

@Middleware()
export class StreamMiddleware implements IMiddleware<Context, NextFunction> {

    resolve() {
        return async (ctx: Context, next: NextFunction) => {

            if (ctx.res.headersSent) {
                if (!ctx.sse) {
                    console.error('[sse]: response headers already sent, unable to create sse stream');
                }
                return await next();
            }

            const sse = new SSE(ctx);
            ctx.sse = sse;
            await next();

            if (!ctx.body) {
                ctx.body = ctx.sse;
            } else {
                ctx.sse.send(ctx.body);
                ctx.body = sse;
            }
        };
    }

}

controller.ts 这里结合了一些我们业务的,按需把需要的东西挑出来用吧。

import { StreamMiddleware } from './middleware/stream.middleware';
import { HumanMessage } from "@langchain/core/messages";
import { Body, Controller, Inject, Post } from '@midwayjs/decorator';
import { ApiBody, ApiOperation, ApiTags } from '@midwayjs/swagger';
import 'dotenv/config';
import { IChatData } from './interface';
import { initGraph } from './langgraph/graph';
import { LangChainService } from './service';

@ApiTags(['langchain'])
@Controller('/api/ai/langchain', { middleware: [StreamMiddleware] })
export class LangChainServiceController {
    @Inject()
    service: LangChainService;

    @Post('/chat')
    @ApiOperation({
        summary: '智能对话',
    })
    @ApiBody({})
    async chat(@Body() data: IChatData) {
        const graph = await initGraph();
        const streamResults = graph.streamEvents(
            {
                messages: [
                    new HumanMessage({
                        content: '你是谁,你能干什么',
                    }),
                ],
            },
            {
                recursionLimit: 10,
                version: 'v2'
            },
        );
        try {
            for await (const output of streamResults) {
                console.log("-----\n");
                const result = JSON.stringify(output);
                console.log(result);
                this.ctx.sse.send(output)
                console.log("-----\n");
            }
            console.log("----close----" + new Date().toLocaleTimeString());
            this.ctx.sse.close();  // 结束连接
        } catch (error) {
            console.error('Stream processing error:', error);
            this.ctx.sse.close(); // 出错时关闭SSE连接
        }
    }
}
czy88840616 commented 1 month ago

按照 #3982 的设计,可能是

async chat(@Body() data: IChatData) {
    const graph = await initGraph();
    const sse = new ServerResponse().sse();
    const streamResults = graph.streamEvents(
        {
            messages: [
                new HumanMessage({
                    content: '你是谁,你能干什么',
                }),
            ],
        },
        {
            recursionLimit: 10,
            version: 'v2'
        },
    );
    try {
        for await (const output of streamResults) {
            sse.send(output)
        }
        sse.sendEnd();
    } catch (error) {
        console.error('Stream processing error:', error);
        sse.sendEnd();
    }
    return sse;
}
gooin commented 1 month ago

@czy88840616 收到,期待pr合并到主版本后使用官方方案,谢谢大佬!