TBXark / ChatGPT-Telegram-Workers

Deploy your own Telegram ChatGPT bot on Cloudflare Workers with ease.
https://t.me/ChatGPT_Telegram_Workers
MIT License
3.5k stars 835 forks source link

用telegram editMessageText API实现流式输出 #93

Closed Qchsi closed 1 year ago

Qchsi commented 1 year ago

https://twitter.com/wsvn53/status/1633799055167066113 能否实现这个功能呢?

TBXark commented 1 year ago

挺6,就不知道这么用workers会不会超过单个执行的时间限制。

mithew commented 1 year ago

这个在网页上还好,因为网页位置不会移动,文字逐渐往下延长,不影响眼睛疲劳,而且看起来科技感十足。

但是在tg里,每显示一小段新信息,屏幕就会跳动一次,很痛苦的(其他项目试过了)

TBXark commented 1 year ago
image

简单尝试后发现容易超时,预计暂时不实现该功能。

TBXark commented 1 year ago

由于workers受限,你可以尝试我另外一个项目 https://github.com/TBXark/chat-bot-go。能简单的实现这个需求

wsvn53 commented 1 year ago

可以实现,我就是这条推文作者,我是基于 CF Workers 实现的效果 😂

TBXark commented 1 year ago

可以实现,我就是这条推文作者,我是基于 CF Workers 实现的效果 😂

  1. 你这边具体怎么实现的,我写了一下会超时呢。
wsvn53 commented 1 year ago

稍晚些我整理些关键代码供你参考

TBXark commented 1 year ago

稍晚些我整理些关键代码供你参考

我好像找到办法了,看到有人成功又去钻研了一下,😂

TBXark commented 1 year ago

稍晚些我整理些关键代码供你参考

image

但是长一点还是会超时,所以感觉还是用不上

TBXark commented 1 year ago

这个是我的测试代码


const config = {
  apiKey: "sk-"
}
async function test() {
  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${config.apiKey}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      "stream": true,
      "model": "gpt-3.5-turbo",
      "messages": [{"role": "user", "content": "Go怎么反转二叉树"}]
    }),
  });
  const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader();
  if (!reader) return;
  var textDecoder = new TextDecoder();
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    console.log(textDecoder.decode(value))
  }
}

export default {
  async fetch(request, env) {
    await test()
    return new Response("Hello world")
  }
}
wsvn53 commented 1 year ago

以下是我这边的核心实现,原本是想做好了开源出来,实在没时间,辛苦 repo 主实现方便大家也是挺好的:

核心逻辑是通过 byob reader 缓冲降低 decode 和回调的频率,减少 cpu time limit 的次数,这个方法基本可以应付 90% 的场景,特殊场景下可以通过 retry mode 的设计来补救,因为 telegram 在调用失败后会尝试重新请求,因此可以在 KV 对同一个请求做个标记

async function invokeOpenAIAPI(params, onStreamData) {
    // request openai api
    const url = 'https://api.openai.com/v1/chat/completions';
    let retryMode = params.retry;
    delete params.retry;

    console.log('OpenAI params:', JSON.stringify(params));
    let response = await fetch(url, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + envs.OPENAI_TOKEN,
        },
        body: JSON.stringify(Object.assign({}, OPENAI_PARAMS, params)),
    });

    var contentFull = '';

    // 注:这里增加 retryMode 是为了防止实在是很长很长的内容导致 CPU time limit 了的补救措施,改编成非 streaming 接口直接返回,这里可以自行做些逻辑处理
    if (retryMode) {
        contentFull = await response.text();
        console.log('In retry mode:', contentFull);
        let content = extractContentFromStreamData(contentFull);
        return content.content;
    }

    let reader = response.body.getReader({ mode: 'byob' });
    var data = { done: false };
    const decoder = new TextDecoder('utf-8');
    var pendingText = '';
    while (data.done == false) {
        // 注:通过 buffer 读取,可以避免轮训太多次已经导致太频繁的 decode 和 太频繁的回调导致消耗 cpu 时间片
        // 经过多轮测试后,这个 buffer size 数量是比较不容易超时的,基本能应付 90% 场景,实在是部分过于大的输出可通过上述的 retry mode 补救
        data = await reader.readAtLeast(4096, new Uint8Array(5000));
        const text = decoder.decode(data.value);
        pendingText = pendingText + text;
        let content = extractContentFromStreamData(pendingText);
        pendingText = content.pending;
        contentFull = contentFull + content.content;
        if (data.done == false) {
            await onStreamData(contentFull);
        }
    }

    return contentFull;
}
TBXark commented 1 year ago

contentFull;

thanks,超时这个确实是没有什么好的办法解决。存kv最大的缺点就是太消耗写入次数了。低频使用还行。回头看看tg重试的时候有没有什么标志位发生变化。

wsvn53 commented 1 year ago

另外提醒一下,CF Workers 的测试页面上比实际更容易超时,我猜测可能是因为调试时插桩有更多性能损耗,因此在浏览器调试界面超时不代表实际运行就很容超时。

基于上述逻辑,我这里测试效果是这样大段的文本都可以正确的流式输出:

As an AI language model, I cannot write or upload applications. However, I can provide a simple example of how to receive file uploads in Golang.

Here's an example using the Gorilla Mux router and the standard built-in Go HTTP package to handle file uploads:

package main

import ( "fmt" "io" "net/http" "os" "path/filepath"

"github.com/gorilla/mux" )

func fileHandler(w http.ResponseWriter, r *http.Request) { file, header, err := r.FormFile("file") if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } defer file.Close()

filename := header.Filename fmt.Println("File name:", filename)

dir, _ := os.Getwd() path := filepath.Join(dir, "uploads", filename) fmt.Println("File destination:", path)

newFile, err := os.Create(path) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer newFile.Close()

if _, err := io.Copy(newFile, file); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }

w.WriteHeader(http.StatusOK) w.Write([]byte("File uploaded successfully.")) }

func main() { r := mux.NewRouter() r.HandleFunc("/upload", fileHandler).Methods("POST") http.ListenAndServe(":8080", r) }

This code listens for a file upload to the "/upload" endpoint via a POST request, reads the file data, and saves it to a local directory named "uploads". The uploaded file is written with its original filename. The example logs the filename and file destination URL to the console and returns a success message to the client in the HTTP response.

TBXark commented 1 year ago

@wsvn53 你这个 extractContentFromStreamData 具体怎么实现,我用正则实现不知道会不会太耗时呢。

function extractContentFromStreamData(stream) {
    let matches = stream.match(/data:\s*({[\s\S]*?})(?=\s*data:|$)/g);
    let remainingStr = stream;
    let contentStr = ""
    matches?.forEach(match => {
        try {
            let matchStartIndex = remainingStr.indexOf(match);
            let jsonStr = match.substr(6);
      console.log(jsonStr)
            let jsonObj = JSON.parse(jsonStr);
      contentStr += jsonObj.choices[0].delta?.content || ''
            remainingStr = remainingStr.slice(matchStartIndex + match.length);
        } catch (e) {
            console.error(e)
        }
    });
    return {
        content: contentStr,
        pending: remainingStr
    }
}
TBXark commented 1 year ago

dev 已添加此功能可以设置 STREAM_MODEtrue 体验。 防止超时的安全模式之后添加。

wsvn53 commented 1 year ago

@wsvn53 你这个 extractContentFromStreamData 具体怎么实现,我用正则实现不知道会不会太耗时呢。

function extractContentFromStreamData(stream) {
  let matches = stream.match(/data:\s*({[\s\S]*?})(?=\s*data:|$)/g);
  let remainingStr = stream;
  let contentStr = ""
  matches?.forEach(match => {
      try {
          let matchStartIndex = remainingStr.indexOf(match);
          let jsonStr = match.substr(6);
      console.log(jsonStr)
          let jsonObj = JSON.parse(jsonStr);
      contentStr += jsonObj.choices[0].delta?.content || ''
          remainingStr = remainingStr.slice(matchStartIndex + match.length);
      } catch (e) {
          console.error(e)
      }
  });
  return {
      content: contentStr,
      pending: remainingStr
  }
}

我是直接分割成行,取行首的 data: 后的数据,这样运算量小一点,取完后去 try JSON.parse 错误了就忽略,留着继续接收数据

TBXark commented 1 year ago

@wsvn53 确实,如果一个json就是一行那就没有问题。

function extractContentFromStreamData(stream) {
  const line = stream.split('\n')
  let remainingStr = '';
  let contentStr = '';
  for(const l of line) {
    try {
      if (l.startsWith('data:')) {
        const data = JSON.parse(l.substring(5));
        contentStr += data.choices[0].delta?.content || ''
      } 
    } catch (e) {
      remainingStr += (remainingStr !== '' ? '\n' : '') + l;
    }
  }
  return {
    content: contentStr,
    pending: remainingStr,
  };
}