memochou1993 / gpt-ai-assistant

OpenAI + LINE + Vercel = GPT AI Assistant
https://memochou1993.github.io/gpt-ai-assistant-docs/
MIT License
7.25k stars 9.36k forks source link

Support message queue #265

Closed mccoysc closed 1 year ago

mccoysc commented 1 year ago

建议利用消息队列将webhook所需http server与bot逻辑分离。 webhook程序全部代码如下:

  1. 该webhook程序以http server方式运行,接收get和post请求。
  2. 将收到的http参数method,path,headers,body解析,且生成一个随机应答id(replyId)后打包为一个json。
  3. 使用mqtt将3的json发布到配置好的topic上。
  4. 以该随机id为topic订阅该topic。
  5. 收到replyId上的消息后,首先取消该replyId的订阅,避免因为消息队列的投递问题重复收到应答。
  6. 解析paylod内的http参数(headers,body,statusCode),以这些参数应答之前缓存的res。

## 本程序实际是个 http(webhook)<==>msg queue<==>http(bot程序) 的proxy,好处是只需要webhook有公网域名,bot部分随便部署在哪里都行,比如你的开发电脑上,方便开发调试和部署。

const express = require("express");
const mqtt = require("mqtt");
const http = require("http");
const crypto = require("crypto");

const replyTimeout = process.env.REPLY_TIMEOUT || 30000;
const mqttEndpoint = process.env.MQ_ENDPOINT || "mqtt://test.mosquitto.org";
const mqttTopic = process.env.MQ_TOPIC || "MQ_ON_HTTTP_REQUEST";
var mqttClient = mqtt.connect(mqttEndpoint);
var mqttOk = false;

async function* streamAsyncIterable(stream) {
    if (!stream.getReader) {
        var shouldLoop = true;
        while (shouldLoop) {
            yield (new Promise(function (resolve, reject) {
                stream.on("data", (chunk) => {
                    resolve(chunk);
                });
                stream.on("end", () => {
                    shouldLoop = false;
                    resolve();
                });
                stream.on("error", (err) => {
                    shouldLoop = false;
                    reject(err);
                });
            }));
        }
        return;
    } else {
        const reader = stream.getReader();
        try {
            while (true) {
                const { done, value } = await reader.read();
                if (done) {
                    return;
                }
                yield value;
            }
        } finally {
            reader.releaseLock();
        }
    }
}

const app = express();
const reply_wait_queue = {};
mqttClient.on("message", async function (topic, payload) {
    (function delTopic() {
        mqttClient.unsubscribe(topic, function (err) {
            if (err) {
                setTimeout(delTopic, 500);
            }
        })
    })();
    const res = reply_wait_queue[topic];
    delete reply_wait_queue[topic];
    if (!res) return;
    try {
        payload = JSON.parse(Buffer.from(payload).toString());
    } catch (error) {
        res.statusCode = 500;
        return res.end("server internal error");
    }
    res.statusCode = payload.statusCode;
    res.headers = payload.httpHeaders;
    if (payload.httpBody) {
        return res.end(Buffer.from(payload.httpBody, payload.httpBodyEncodeType));
    }
    res.end();
});
app.use(async (req, res, next) => {
    if (!mqttOk) {
        res.statusCode = 500;
        return res.end("invalid server status");
    }
    const replyId = crypto.randomBytes(16).toString("hex");
    var msgPayload = { replyTopic: replyId, httpMethod: req.method, httpPath: req.path, httpHeaders: req.headers };
    if (req.method.toLocaleLowerCase() === "post") {
        var chunks = [];
        for await (var chunk of streamAsyncIterable(req)) {
            chunks.push(chunk);
        }
        msgPayload.httpBodyEncodeType = "base64";
        msgPayload.httpBody = Buffer.concat(chunks).toString("base64");
    } else if (req.method.toLocaleLowerCase() !== "get") {
        res.statusCode = 400;
        return res.end("only support get,post")
    }
    setTimeout(function () {
        const res = reply_wait_queue[replyId];
        delete reply_wait_queue[replyId];
        (function delTopic() {
            mqttClient.unsubscribe(replyId, function (err) {
                if (err && err.message && err.message.indexOf("Connection closed") < 0) {
                    setTimeout(delTopic, 500);
                }
            })
        })();
        if (res) {
            res.statusCode = 500;
            res.end("generate response timeout");
        }
    }, replyTimeout)
    mqttClient.subscribe(replyId);
    reply_wait_queue[replyId] = res;
    return mqttClient.publish(mqttTopic, JSON.stringify(msgPayload));
})

const listenPort = process.env.PORT || process.env.SERVER_PORT || process.env.APP_PORT || 80;
http.createServer(app).listen(listenPort, async function () {
    console.log(`http server listen at port ${listenPort}`);
    try {
        mqttOk = await new Promise(function (resolver) {
            mqttClient.once("connect", function () {
                resolver(true);
            });
        });
        console.log("mqtt connect ok");
    } catch (error) {
        console.log("mqtt  connect error");
    }
});

//for alinyun serverlesss function
var getRawBody;
var fetch_func = (typeof fetch !== typeof undefined ? fetch : require("node-fetch"));
exports.handler = async (req, resp, context) => {
    getRawBody = getRawBody || require('raw-body');
    var body
    try {
        body = req.method.toLocaleLowerCase() !== "post" ? undefined : (await (new Promise((resolver, reject) => {
            getRawBody(req, (err, body) => {
                if (err) {
                    return reject(err);
                }
                resolver(body);
            });
        })));
    } catch (error) {
        resp.statusCode = 500;
        return resp.send("get http body error");
    }
    const fetchOpts = {
        method: req.method,
        headers: req.headers
    };
    if (body) {
        fetchOpts.body = body;
    }
    const httpRes = await fetch_func(`http://127.0.0.1:${listenPort}${req.path}`, fetchOpts);
    resp.statusCode = httpRes.status / 1
    for ([k, v] of (new Map(httpRes.headers))) {
        resp.setHeader(k, v)
    }
    const chunks = [];
    for await (var chunk of streamAsyncIterable(httpRes.body)) {
        chunks.push(chunk);
    }
    var resData = Buffer.concat(chunks);
    resp.send(resData);
}

以下是对原bot程序index.js改造后的代码,核心逻辑就是:

  1. 在http server监听成功后,订阅配置好的topic(mqttClient.subscribe(mqttTopic))。
  2. 收到消息后,解析payload内的http参数(method,path,headers,body)。
  3. 以2解析出的参数请求本地127.0.0.1,实现转发原本webhook的http请求的效果。
  4. 将收到的应答包装成json,publish到replyId的topic上,处理结束。
import express from 'express';
import { handleEvents, printPrompts } from '../app/index.js';
import config from '../config/index.js';
import { validateLineSignature } from '../middleware/index.js';
import storage from '../storage/index.js';
import { fetchVersion, getVersion } from '../utils/index.js';
import * as mqtt from "mqtt"
import * as http from "http"
import * as fs from "fs"

async function* streamAsyncIterable(stream) {
  if (!stream.getReader) {
    var shouldLoop = true;
    while (shouldLoop) {
      yield (new Promise(function (resolve, reject) {
        stream.on("data", (chunk) => {
          resolve(chunk);
        });
        stream.on("end", () => {
          shouldLoop = false;
          resolve();
        });
        stream.on("error", (err) => {
          shouldLoop = false;
          reject(err);
        });
      }));
    }
    return;
  } else {
    const reader = stream.getReader();
    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) {
          return;
        }
        yield value;
      }
    } finally {
      reader.releaseLock();
    }
  }
}

const app = express();

app.use(express.json({
  verify: (req, res, buf) => {
    req.rawBody = buf.toString();
  },
}));

app.get('/', (req, res) => {
  if (config.APP_URL) {
    res.redirect(config.APP_URL);
    return;
  }
  res.sendStatus(200);
});

app.get('/info', async (req, res) => {
  const currentVersion = getVersion();
  const latestVersion = await fetchVersion();
  res.status(200).send({ currentVersion, latestVersion });
});

app.post(config.APP_WEBHOOK_PATH, validateLineSignature, async (req, res) => {
  try {
    await storage.initialize();
    await handleEvents(req.body.events);
    res.sendStatus(200);
  } catch (err) {
    console.error(err.message);
    res.sendStatus(500);
  }
  if (config.APP_DEBUG) printPrompts();
});

if (config.APP_PORT) {
  http.createServer(app).listen(config.APP_PORT, async function () {
    const mqttEndpoint = process.env.MQ_ENDPOINT || "mqtt://test.mosquitto.org";
    const mqttTopic = process.env.MQ_TOPIC || "MQ_ON_HTTTP_REQUEST";
    var mqttClient = mqtt.connect(mqttEndpoint);
    try {
      await (new Promise(function (resolver) {
        mqttClient.once("connect", function () {
          return resolver(true);
        });
      }));
    } catch (error) {
      throw new Error("connect mqtt endpoint error");
    }

    mqttClient.subscribe(mqttTopic);
    mqttClient.on("message", async function (topic, payload) {
      if (!payload || !payload.length) {
        return;
      }
      try {
        payload = JSON.parse(Buffer.from(payload).toString());
      } catch (error) {
        console.log("invalid mqtt msg:",Buffer.from(payload).toString());
      }
      if (!payload.httpHeaders || !payload.replyTopic) {
        if (payload.replyTopic) {
          mqttClient.publish(payload.replyTopic, JSON.stringify({
            statusCode: 400,
            httpHeaders: {},
            httpBodyEncodeType: "base64",
            httpBody: Buffer.from("invalid msg data").toString("base64")
          }));
        }
        return;
      }
      if (!payload.httpPath) {
        payload.httpPath = "/";
      }
      if (!payload.httpMethod) {
        payload.httpMethod = "POST";
      }
      if (payload.httpMethod.toUpperCase() === "POST" && payload.httpBodyEncodeType) {
        if (payload.httpBodyEncodeType) {
          if (payload.httpBodyEncodeType === "URIComponent") {
            payload.httpBody = Buffer.from(decodeURIComponent(payload.httpBody));
          } else {
            payload.httpBody = Buffer.from(payload.httpBody, payload.httpBodyEncodeType);
          }
        } else {
          payload.httpBody = Buffer.from(payload.body);
        }
      }
      const fetchOpts={
        method: payload.httpMethod.toUpperCase(),
        headers: payload.httpHeaders,
      };
      delete fetchOpts.headers["connection"];
      delete fetchOpts.headers["Connection"];
      if(payload.httpBody){
        fetchOpts.body=Uint8Array.from(payload.httpBody)
      }
      var httpResponse = await fetch(`http://127.0.0.1:${config.APP_PORT}${payload.httpPath}`,fetchOpts);
      if (!payload.replyTopic) {
        if (httpResponse.status / 1 !== 200) {
          console.log("http response with statusCode " + res.statusCode + " in mqtt on message handler");
        }
        return;
      }
      const chunks = [];
      for await (var chunk of streamAsyncIterable(httpResponse.body)) {
        chunks.push(chunk);
      }
      var headers={};
      for (var header of (new Map(httpResponse.headers))) {
        headers=Object.assign(headers,header);
      }
      var resData = Buffer.concat(chunks).toString("base64");
      mqttClient.publish(payload.replyTopic, JSON.stringify({
        statusCode: httpResponse.status / 1,
        httpHeaders: headers,
        httpBodyEncodeType: "base64",
        httpBody: resData
      }));
    });
  });
}

export default app;

`

memochou1993 commented 1 year ago

感謝提供參考。