gofiber / fiber

⚑️ Express inspired web framework written in Go
https://gofiber.io
MIT License
33.68k stars 1.65k forks source link

πŸ€— [Question]: Setup of SSE Fiber with fasthttp.StreamWriter - event source is pending / never connects ... #2837

Open michealroberts opened 8 months ago

michealroberts commented 8 months ago

Question Description

Versions:

Go 1.21.5 github.com/gofiber/fiber/v2 v2.52.0 github.com/valyala/fasthttp v1.51.0


Issue

I have the following logic inside of an SSE handler:

// Peak at the incoming Accept request header:
accept := c.Request().Header.Peek("Accept")

// Check whether the Accept header is set to text/event-stream:
if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
  ctx := c.Context()

  ctx.SetContentType("text/event-stream")

  ctx.Response.Header.Set("Cache-Control", "no-cache")
  ctx.Response.Header.Set("Connection", "keep-alive")
  ctx.Response.Header.Set("Transfer-Encoding", "chunked")
  ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
  ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")

  ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
    defer func() {
      if r := recover(); r != nil {
        fmt.Println("Recovered in SSE writer:", r)
      }
    }()

    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
      select {
      case <-ticker.C:
        status, err := GetStatus(telescope)

        if err != nil {
          fmt.Printf("Error while getting status: %v\n", err)
          return
        }

        data, err := json.Marshal(status)

        if err != nil {
          fmt.Printf("Error while marshaling JSON: %v\n", err)
          return
        }

        fmt.Fprintf(w, "data: %s\n\n", string(data))

        fmt.Println(string(data))

        if err := w.Flush(); err != nil {
          fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
          return
        }
      case <-c.Context().Done():
        fmt.Println("Client disconnected. Closing connection.")
        return
      }
    }
  }))

  return nil
}

Which to me, looks good. I can log the message without issue ...

However, when connecting from a browser side client ... the connection is stuck in the "connecting" phase of the event source connection.

I can see, that when the endpoint is called from my client, the server is logging correctly:

CleanShot 2024-02-08 at 11 50 20

But the connection remains as pending:

CleanShot 2024-02-08 at 11 49 24@2x

Also, no errors when requesting application/json (so for me, it isn't a case that the underlying code has an issue):

CleanShot 2024-02-08 at 12 03 50@2x

The front end JS code is standard for the EventSource API.


Headers

CleanShot 2024-02-08 at 11 54 39@2x

Reproduction

I can also provide access to the repository for a minimal reproduction if the issue isn't apparent from what I have supplied if needed, please just request access for your username and I can provide it (as long as you are listed as a core maintainer of this repo).

Code Snippet (optional)

/*****************************************************************************************************************/

//  @author     Michael Roberts <michael@observerly.com>
//  @package    @observerly/nox/telescope
//  @license    Copyright Β© 2021-2023 observerly

/*****************************************************************************************************************/

package telescope

/*****************************************************************************************************************/

import (
    "bufio"
    "encoding/json"
    "fmt"
    "strings"
    "time"

    "github.com/gofiber/fiber/v2"
    "github.com/observerly/alpacago/pkg/alpacago"
    "github.com/valyala/fasthttp"

    "nox/internal/common"
    "nox/internal/middleware"
)

/*****************************************************************************************************************/

type GetStatusHandlerResponse struct {
    Connected bool `json:"connected"`
}

/*****************************************************************************************************************/

type GetStatusChannels struct {
    Connected chan bool `json:"connected"`
}

/*****************************************************************************************************************/

func GetStatus(telescope *alpacago.Telescope) (GetStatusHandlerResponse, error) {
    // Create channels for the status values:
    channels := GetStatusChannels{}

    // Create a wait group for the status values:
    wg, channels, errors := common.SetupWaitGroupForStruct(channels)

    // Get the connection status:
    go func() {
        defer wg.Done()
        common.RetrieveAndSendToChannel(telescope.IsConnected, channels.Connected, errors)
    }()

    go func() {
        // Wait for all the goroutines to finish:
        wg.Wait()
        // Close the channels:
        common.CloseChannelsForStruct(channels)
    }()

    status := &GetStatusHandlerResponse{}

    // Extract the values from the channels:
    err := common.ExtractValueFromChannelStruct(channels, status)

    // Check if we encountered any errors while extracting the values:
    if len(errors) > 0 {
        return *status, fmt.Errorf("encountered errors while retrieving status values: %v", errors)
    }

    // If we encounter an error, return the error:
    if err != nil {
        return *status, err
    }

    // Return the status values:
    return *status, nil
}

/*****************************************************************************************************************/

func GetStatusHandler(c *fiber.Ctx) error {
    // Get the telescope client from the context middleware:
    telescope, ok := c.Locals("telescope").(*alpacago.Telescope)

    // If the telescope client is not available in the context, return an error:
    if !ok {
        return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
            "error": "Telescope is not available in context",
        })
    }

    // Peak at the incoming Accept request header:
    accept := c.Request().Header.Peek("Accept")

    // Check whether the Accept header is set to text/event-stream:
    if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
        ctx := middleware.TextEventStream(c)

        ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
            defer func() {
                if r := recover(); r != nil {
                    fmt.Println("Recovered in SSE writer:", r)
                }
            }()

            ticker := time.NewTicker(1 * time.Second)
            defer ticker.Stop()

            for {
                select {
                case <-ticker.C:
                    status, err := GetStatus(telescope)

                    if err != nil {
                        fmt.Printf("Error while getting status: %v\n", err)
                        return
                    }

                    data, err := json.Marshal(status)

                    if err != nil {
                        fmt.Printf("Error while marshaling JSON: %v\n", err)
                        return
                    }

                    fmt.Fprintf(w, "data: %s\n\n", string(data))

                    fmt.Println(string(data))

                    if err := w.Flush(); err != nil {
                        fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
                        return
                    }
                case <-c.Context().Done():
                    fmt.Println("Client disconnected. Closing connection.")
                    return
                }
            }
        }))

        return nil
    }

    // Get the telescope status
    status, err := GetStatus(telescope)

    if err != nil {
        return c.Status(fiber.StatusInternalServerError).JSON(
            common.ErrorResponse{
                Error: err.Error(),
            },
        )
    }

    // Return the telescope status:
    return c.JSON(status)
}

/*****************************************************************************************************************/

Checklist:

efectn commented 8 months ago

@michealroberts is your endpoint working if you remove

case <-c.Context().Done():
    fmt.Println("Client disconnected. Closing connection.")
    return
michealroberts commented 8 months ago

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: https://github.com/observerly/nox/pull/48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

efectn commented 8 months ago

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: observerly/nox#48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

This one works for me https://paste.laravel.io/8c6a1464-4f52-46c1-b362-ab49f5ad60cf

2024-02-09_18-40

michealroberts commented 8 months ago

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers πŸ€”

What are your thoughts on this? I should be able to get a minimal reproduction.

efectn commented 8 months ago

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers πŸ€”

What are your thoughts on this? I should be able to get a minimal reproduction.

Yes it seems. You can disable etag for specific path like:

app.Use(etag.New(etag.Config{
    Next: func(c *fiber.Ctx) bool {
        return c.Path() == "/sse"
    },
}))
michealroberts commented 8 months ago

@efectn I think I will disable ETags globally for now, I have quite a number of SSE routes.

I wonder if I should open up a separate minimal, reproducible, example of the SSE + ETag issue, and maybe start to work on a possible fix ...

sdaduanbilei commented 7 months ago

I have the same problem.

michealroberts commented 5 months ago

I will start working on this issue this week.