eepson123tw / public-notes

some notes about the develop issue and can quickly review
0 stars 0 forks source link

How to build a SSE stream api to frontend #4

Closed eepson123tw closed 2 weeks ago

eepson123tw commented 3 weeks ago

Server-Sent Events (SSE) 是一種基於 HTTP 的技術,允許伺服器持續向客戶端發送事件,主要用於實時應用。以下是 SSE 的一些關鍵特點和使用情境:

基本概念

數據格式

連線管理

與 WebSocket 的比較

特徵 SSE WebSocket
通訊方向 單向(伺服器到客戶端) 雙向
協議 基於 HTTP 自定義協議
數據格式 鍵值對 可自定義格式(如 JSON)
連線管理 伺服器需維護連線列表 連線持久化,無需額外管理
編碼 只支持 UTF-8 可支持多種編碼

總結來說,SSE 是一種適合於需要伺服器主動推送數據的應用場景,但由於其單向通訊和 HTTP 基礎的特性,可能在某些情況下不如 WebSocket 靈活。

Citations: [1] https://datatracker.ietf.org/doc/html/rfc8895 [2] https://www.encora.com/insights/real-time-communication-simplified-a-deep-dive-into-server-sent-events-sse [3] https://stackoverflow.com/questions/63583989/performance-difference-between-websocket-and-server-sent-events-sse-for-chat-r [4] https://en.wikipedia.org/wiki/Server-sent_events [5] https://softwaremill.com/sse-vs-websockets-comparing-real-time-communication-protocols/ [6] https://ably.com/blog/websockets-vs-sse [7] https://apidog.com/blog/websockets-vs-server-sent-events/ [8] https://www.reddit.com/r/ExperiencedDevs/comments/1845vtf/websockets_vs_server_sent_events/ [9] https://www.svix.com/resources/faq/websocket-vs-sse/

eepson123tw commented 2 weeks ago

SSE 的工作原理:

  1. 客戶端發送請求:客戶端通過 EventSource API 向伺服器發送一個長時間連接請求。
  2. 伺服器推送資料:伺服器持續保持連接並使用 text/event-stream 格式將資料以事件流的形式推送到客戶端。
  3. 客戶端接收資料:客戶端收到事件資料後,可以立即處理並顯示,無需再向伺服器發送請求。

簡單範例:

1. 伺服器端 (使用 Python 和 FastAPI):

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time

app = FastAPI()

@app.get("/sse")
async def sse_endpoint():
    def event_stream():
        for i in range(1, 6):
            yield f"data: Hello {i}\n\n"
            time.sleep(1)  # 模擬延遲

    return StreamingResponse(event_stream(), media_type="text/event-stream")

這段程式碼展示了一個簡單的 SSE 伺服器,該伺服器每秒推送一條消息給客戶端,共推送五次。

2. 客戶端 (HTML 和 JavaScript):

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Example</title>
</head>
<body>
    <h1>Server-Sent Events Example</h1>
    <div id="messages"></div>

    <script>
        const evtSource = new EventSource("http://localhost:8000/sse");

        evtSource.onmessage = function(event) {
            const newElement = document.createElement("div");
            newElement.textContent = `Message: ${event.data}`;
            document.getElementById("messages").appendChild(newElement);
        };

        evtSource.onerror = function(event) {
            console.error("SSE Error:", event);
            evtSource.close();  // 當出現錯誤時關閉連接
        };
    </script>
</body>
</html>

這個 HTML 頁面使用 JavaScript 的 EventSource API 與伺服器建立 SSE 連接,並將收到的每條消息顯示在頁面上。

總結:

SSE 是一種簡單且有效的伺服器向客戶端推送即時更新的方式,適合需要伺服器單向推送資料的應用場景。相比於 WebSocket,SSE 的實現更為簡單且適用於單向的即時推送需求。

eepson123tw commented 2 weeks ago

StreamingResponse 是 FastAPI 提供的一種特殊的 HTTP 回應類型,用於在伺服器端生成數據並逐步發送到客戶端,而不是一次性將所有數據生成完後再發送。這在處理大量數據或需要持續發送數據的情況下特別有用,比如實現 Server-Sent Events (SSE)、逐步生成大型文件、音頻/視頻串流等。

StreamingResponse 的用途:

如何使用 StreamingResponse

基本範例:

假設你有一個需要逐步生成的大量數據,並希望將這些數據逐步發送給客戶端:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time

app = FastAPI()

def generate_large_data():
    for i in range(1, 101):
        yield f"data item {i}\n"
        time.sleep(0.1)  # 模擬數據生成的延遲

@app.get("/stream-data")
def stream_data():
    return StreamingResponse(generate_large_data(), media_type="text/plain")

說明:

  1. generate_large_data 函數

    • 這個生成器函數模擬逐步生成數據,每次生成一個數據項,並使用 yield 關鍵字逐個返回。
    • 每次生成數據後模擬延遲 0.1 秒,以展示逐步生成數據的效果。
  2. StreamingResponse 的使用

    • stream_data 路由中,StreamingResponse 被用來包裝 generate_large_data 函數。這樣,FastAPI 會將數據逐步發送給客戶端,而不是一次性傳輸所有數據。
    • media_type 指定了回應的內容類型,這裡設置為 text/plain,因為我們的數據是純文本。
  3. 客戶端的行為

    • 客戶端在請求這個路徑時,不會一次性接收到所有數據,而是會隨著數據生成逐步接收並顯示。

SSE 和 StreamingResponse 結合使用的範例:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_stream():
    for i in range(1, 6):
        yield f"data: Message {i}\n\n"
        await asyncio.sleep(1)

@app.get("/sse")
async def sse():
    return StreamingResponse(event_stream(), media_type="text/event-stream")

說明:

總結:

StreamingResponse 是 FastAPI 中的一個強大工具,它允許你在數據生成時立即開始傳輸,對於處理大量數據或實時應用非常有用。當與 SSE 結合使用時,它可以讓伺服器端逐步發送實時事件給客戶端,實現實時通知等功能。

eepson123tw commented 6 days ago

我們也可以停止 streamapi 使用原生 fetch https://github.com/vercel/ai/blob/b93f963462f52a72a59a77619ccff669244fcfe2/packages/ui-utils/src/call-chat-api.ts#L8