syumai / workers

Go package to run an HTTP server on Cloudflare Workers.
MIT License
698 stars 41 forks source link

Deadlock reading from R2 and writing to a ZIP writer #103

Closed eliezedeck closed 7 months ago

eliezedeck commented 7 months ago

Hi,

I'm trying to stream ZIP file that is dynamically created on the fly. The implementation already works (not in tinygo, but in full go), but it exhibits a panic at runtime. Here is the error as captured by wrangler:

POST http://127.0.0.1:9899/services/stream-zip-from-r2 - Exception Thrown @ 4/16/2024, 6:54:17 PM
  (log) fatal error: all goroutines are asleep - deadlock!
  (log) 
  (log) goroutine 1 [chan receive]:
  (log) github.com/syumai/workers.Serve(...)
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/handler.go:95
  (log) main.main()
  (log)         /Users/elie/Projects/workers-go/main.go:164 +0x1c
  (log) 
  (log) goroutine 6 [select]:
  (log) io.(*pipe).read(0x18c8ba0, {0x1bec800, 0x4100, 0x4100})
  (log)         /usr/local/go/src/io/pipe.go:57 +0x6
  (log) io.(*PipeReader).Read(0x18c8ba0, {0x1bec800, 0x4100, 0x4100})
  (log)         /usr/local/go/src/io/pipe.go:134 +0x4
  (log) github.com/syumai/workers/internal/jsutil.(*readerToReadableStream).Pull(0x1bb0300, {{}, 0x7ff800010000009d, 0x1ba3588})
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/stream.go:121 +0x6
  (log) github.com/syumai/workers/internal/jsutil.ConvertReaderToReadableStream.func1.1({{}, 0x0, 0x0}, {0x1b94360, 0x2, 0x2})
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/stream.go:170 +0x5
  (log) syscall/js.handleEvent()
  (log)         /usr/local/go/src/syscall/js/func.go:100 +0x23
  (log) syscall/js.Value.New({{}, 0x7ff800040000000f, 0x180c068}, {0x1841e38, 0x1, 0x1})
  (log)         /usr/local/go/src/syscall/js/js.go:431 +0x3
  (log) github.com/syumai/workers/internal/jsutil.NewPromise(...)
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/jsutil.go:34
  (log) github.com/syumai/workers/internal/jsutil.ConvertReaderToReadableStream.func1({{}, 0x7ff8000100000098, 0x1ba3578}, {0x1bc8b60, 0x1, 0x1})
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/stream.go:178 +0xd
  (log) syscall/js.handleEvent()
  (log)         /usr/local/go/src/syscall/js/func.go:100 +0x23
  (log) 
  (log) goroutine 8 [select]:
  (log) github.com/syumai/workers/internal/jsutil.(*readableStreamToReadCloser).Read(0x1b5a000, {0x1a3c000, 0x8000, 0x8000})
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/stream.go:61 +0x22
  (log) io.copyBuffer({0x134828, 0x1b84000}, {0x134e00, 0x1bc8730}, {0x0, 0x0, 0x0})
  (log)         /usr/local/go/src/io/io.go:429 +0x27
  (log) io.Copy(...)
  (log)         /usr/local/go/src/io/io.go:388
  (log) github.com/syumai/workers/internal/jsutil.(*readableStreamToReadCloser).WriteTo(0x1b5a000, {0x134828, 0x1b84000})
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/internal/jsutil/stream.go:96 +0xc
  (log) io.copyBuffer({0x134828, 0x1b84000}, {0x134e20, 0x1b5a000}, {0x1bf1000, 0x4100, 0x4100})
  (log)         /usr/local/go/src/io/io.go:411 +0x22
  (log) io.CopyBuffer({0x134828, 0x1b84000}, {0x134e20, 0x1b5a000}, {0x1bf1000, 0x4100, 0x4100})
  (log)         /usr/local/go/src/io/io.go:402 +0x4
  (log) main.main.func2({0x138618, 0x1ba5400})
  (log)         /Users/elie/Projects/workers-go/main.go:104 +0xa1
  (log) github.com/labstack/echo/v4.(*Echo).add.func1({0x138618, 0x1ba5400})
  (log)         /Users/elie/go/pkg/mod/github.com/labstack/echo/v4@v4.11.4/echo.go:582 +0x5
  (log) github.com/labstack/echo/v4.(*Echo).ServeHTTP(0x18c4908, {0x135e40, 0x1b84e10}, 0x1b8ed80)
  (log)         /Users/elie/go/pkg/mod/github.com/labstack/echo/v4@v4.11.4/echo.go:669 +0x52
  (log) github.com/syumai/workers.handleRequest.func1()
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/handler.go:77 +0x4
  (log) created by github.com/syumai/workers.handleRequest in goroutine 7
  (log)         /Users/elie/go/pkg/mod/github.com/syumai/workers@v0.23.3/handler.go:74 +0x28
  (warn) exit code: 2
✘ [ERROR]   TypeError: Cannot read properties of undefined (reading 'exports')

Here is the implementation of a simple streaming ZIP:

e.POST("/services/stream-zip-from-r2", func(c echo.Context) error {
    // Get the request
    req := struct {
        BucketName  string `json:"bucketName" validate:"required,oneof=expense-documents profile-photos"`
        ZipFileName string `json:"zipFileName" validate:"required,min=1"`

        Files []struct {
            FileName string `json:"name" validate:"required,min=1"`
            FileURL  string `json:"url" validate:"required,min=1"`
        } `json:"files" validate:"required,dive"`

        Relaxed bool `json:"relaxed"` // Don't stop ZIP creation if a file is not found
    }{}
    if _, err := ValidateBodyStrictInto(c.Request().Body, &req); err != nil {
        return c.JSON(http.StatusBadRequest, map[string]string{"error": err.Error()})
    }

    // Get the bucket binding
    bucket, err := getBucketBinding(c.Request().Context(), req.BucketName)
    if err != nil {
        return c.JSON(http.StatusBadRequest, map[string]string{"error": err.Error()})
    }

    // Start streaming
    c.Response().Header().Set(echo.HeaderContentType, "application/zip")
    c.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+req.ZipFileName)
    c.Response().WriteHeader(http.StatusOK)

    // Prepare to stream a ZIP file from R2
    writer := zip.NewWriter(c.Response())
    writer.RegisterCompressor(zip.Deflate, func(out io.Writer) (io.WriteCloser, error) {
        return flate.NewWriter(out, flate.NoCompression)
    })
    //defer func(writer *zip.Writer) {
    //  if err := writer.Close(); err != nil {
    //      c.Logger().Errorf("failed to close ZIP writer: %v", err)
    //  }
    //}(writer)

    count := 0
    for _, file := range req.Files {
        // Get the file from R2
        reader, err := bucket.Get(file.FileURL)
        if err != nil {
            c.Logger().Errorf("failed to get file '%s' from R2: %v", file.FileName, err)
            return err
        }
        if reader == nil {
            if req.Relaxed {
                c.Logger().Warnf("file '%s' not found", file.FileURL)
                continue
            } else {
                c.Logger().Errorf("file '%s' not found", file.FileURL)
                return fmt.Errorf("file '%s' not found", file.FileURL)
            }
        }
        // no need to close the reader

        // Add the file to the ZIP
        writer, err := writer.Create(file.FileName)
        if err != nil {
            c.Logger().Errorf("failed to create file '%s' in ZIP: %v", file.FileName, err)
            return err
        }
        c.Logger().Infof("Streaming file '%s' to ZIP (length = %d bytes)", file.FileName, reader.Size)

        // Stream the file
        blob := make([]byte, reader.Size)
        if _, err := io.ReadFull(reader.Body, blob); err != nil {
            c.Logger().Errorf("failed to read file '%s' from R2: %v", file.FileName, err)
            return err
        }
        if _, err := writer.Write(blob); err != nil {
            c.Logger().Errorf("failed to write file '%s' to ZIP: %v", file.FileName, err)
            return err
        }

        // This causes a quick deadlock
        //if _, err := io.CopyBuffer(writer, reader.Body, make([]byte, 16_640)); err != nil {
        //  c.Logger().Errorf("failed to write file '%s' to ZIP: %v", file.FileName, err)
        //  return err
        //}

        count++
    }

    return nil
})

Is this a known issue, or am I doing something wrong? Note: the defer to close the writer doesn't have any effect, it crashes all the same.

Another note: I have create a test route that will just generate random string and ZIP it (just store, no compression), and that one had no panic. So, I'm not sure if this is an R2 read?

Thank you.

syumai commented 7 months ago

@eliezedeck Thank you for filing an issue! I'm sorry, but I don't know what causes this issue. It's probably correct to say that reading from R2 is the cause. This could either be due to file size or stream conversion. I will investigate. If you could share the repository for reproduction (including dependencies, go.mod, etc.), it would be helpful for investigation.

eliezedeck commented 7 months ago

I'm not allowed to directly share the repo. However, I'll do my best to give the max info I can to help.

go.mod:

module workersgo

go 1.22.2

require (
    github.com/labstack/echo/v4 v4.11.4
    github.com/syumai/workers v0.23.3
)

require (
    github.com/gabriel-vasile/mimetype v1.4.3 // indirect
    github.com/go-playground/locales v0.14.1 // indirect
    github.com/go-playground/mold/v4 v4.5.0 // indirect
    github.com/go-playground/universal-translator v0.18.1 // indirect
    github.com/go-playground/validator/v10 v10.19.0 // indirect
    github.com/gosimple/slug v1.13.1 // indirect
    github.com/gosimple/unidecode v1.0.1 // indirect
    github.com/klauspost/compress v1.17.7 // indirect
    github.com/labstack/gommon v0.4.2 // indirect
    github.com/leodido/go-urn v1.4.0 // indirect
    github.com/mattn/go-colorable v0.1.13 // indirect
    github.com/mattn/go-isatty v0.0.20 // indirect
    github.com/segmentio/go-camelcase v0.0.0-20160726192923-7085f1e3c734 // indirect
    github.com/segmentio/go-snakecase v1.2.0 // indirect
    github.com/tidwall/gjson v1.14.2 // indirect
    github.com/tidwall/match v1.1.1 // indirect
    github.com/tidwall/pretty v1.2.0 // indirect
    github.com/tidwall/sjson v1.2.5 // indirect
    github.com/valyala/bytebufferpool v1.0.0 // indirect
    github.com/valyala/fasttemplate v1.2.2 // indirect
    golang.org/x/crypto v0.22.0 // indirect
    golang.org/x/net v0.21.0 // indirect
    golang.org/x/sys v0.19.0 // indirect
    golang.org/x/text v0.14.0 // indirect
)

wrangler.toml:

name = "go-services"
main = "./build/worker.mjs"
compatibility_date = "2024-04-05"
workers_dev = false
send_metrics = true

account_id = "xxxxxxxxxxxxxxxx"

[build]
command = "make build"

[env.dev]
name = "go-services"
vars = { ENV = "dev" }
r2_buckets = [
    { binding = "EXPENSE_DOCUMENTS", bucket_name = "expense-documents", preview_bucket_name = "expense-documents-preview" },
    { binding = "PROFILE_PHOTOS", bucket_name = "profile-photos", preview_bucket_name = "profile-photos-preview" }
]

[env.staging]
name = "staging-go-services"
vars = { ENV = "staging" }
r2_buckets = [
    { binding = "EXPENSE_DOCUMENTS", bucket_name = "staging-expense-documents", preview_bucket_name = "staging-expense-documents-preview" },
    { binding = "PROFILE_PHOTOS", bucket_name = "staging-profile-photos", preview_bucket_name = "staging-profile-photos-preview" }
]

[env.production]
name = "go-services"
vars = { ENV = "production" }
r2_buckets = [
    { binding = "EXPENSE_DOCUMENTS", bucket_name = "expense-documents", preview_bucket_name = "expense-documents-preview" },
    { binding = "PROFILE_PHOTOS", bucket_name = "profile-photos", preview_bucket_name = "profile-photos-preview" }
]

Makefile:

.PHONY: dev
dev:
    wrangler dev --port 9899 --inspector-port 9232

.PHONY: build
build:
    go run github.com/syumai/workers/cmd/workers-assets-gen@v0.23.3 -mode=go
    GOOS=js GOARCH=wasm go build -o ./build/app.wasm .

.PHONY: deploy
deploy:
    wrangler deploy --env staging --minify --keep-vars

The script is simple enough, with this section as start:

package main

import (
    "compress/flate"
    "context"
    "errors"
    "fmt"
    "io"
    "math/rand"
    "net/http"
    "time"

    "github.com/klauspost/compress/zip"
    "github.com/labstack/echo/v4"
    "github.com/syumai/workers"
    "github.com/syumai/workers/cloudflare"
)

func main() {
    e := echo.New()

    e.GET("/", func(c echo.Context) error {
        return c.String(http.StatusOK, "Hello, World!")
    })

For reference, here is the test file that is generating garbage ZIP (but still accurate and correct), this is the one that does not crash:

    e.GET("/services/test/generate-zip", func(c echo.Context) error {
        // Prepare to stream a ZIP file
        writer := zip.NewWriter(c.Response())
        writer.RegisterCompressor(zip.Deflate, func(out io.Writer) (io.WriteCloser, error) {
            return flate.NewWriter(out, flate.NoCompression)
        })
        defer func(writer *zip.Writer) {
            _ = writer.Close()
        }(writer)

        // Start streaming
        c.Response().Header().Set(echo.HeaderContentType, "application/zip")
        c.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename=dummy.zip")
        c.Response().WriteHeader(http.StatusOK)

        // Seed the random number generator
        rand.Seed(time.Now().UnixNano())

        // Generate dummy files
        for i := 0; i < 20; i++ {
            // Generate a random string of a random length
            length := rand.Intn(100) // Change this value to adjust the maximum length
            randomString := make([]byte, length)
            for i := range randomString {
                randomString[i] = byte(rand.Intn(26) + 97) // Generate a random lowercase letter
            }

            // Add the file to the ZIP
            fileWriter, err := writer.Create(fmt.Sprintf("file%d.txt", i))
            if err != nil {
                c.Logger().Errorf("failed to create file 'file%d.txt' in ZIP: %v", i, err)
                return err
            }

            // Write the random string to the file
            if _, err := fileWriter.Write(randomString); err != nil {
                c.Logger().Errorf("failed to write to file 'file%d.txt' in ZIP: %v", i, err)
                return err
            }
        }

        return nil
    })

Hope that helps.

syumai commented 7 months ago

@eliezedeck Thank you! I'll check this later.

syumai commented 7 months ago

I've confirmed the problem happens when writing large file to zip. I'll do more investigation.

syumai commented 7 months ago

@eliezedeck I found that response streaming based on io.Pipe (in handler.go) was causing the deadlock. If you use a buffer for zip writing and stream the response from there, you won't have any problems. see: https://github.com/syumai/workers-playground/blob/3877c5bac5c6e747b7b5ff258c18d71ff3ace5a9/syumai-workers-repro-issue-103/main.go#L33-L37

However, this is not a complete solution as there may be very large files and require a large amount of memory. Therefore, I'll continue to investigate.

syumai commented 7 months ago

works

* R2Object<ReadableStream> -> zip.Writer(bytes.Buffer)
* bytes.Buffer -> ResponseWriter<ReadableStream>

not works

* R2Object<ReadableStream> -> zip.Writer(ResponseWriter<ReadableStream>)

Probably, a direct connection between ReadableStreams on the JS side is considered blocked on Go runtime if both sides of the stream are waiting on a Promise.

syumai commented 7 months ago

@eliezedeck Perhaps https://github.com/syumai/workers/releases/tag/v0.25.0 fixed the problem. Please check it!

Now, my reproduction code is working.

eliezedeck commented 7 months ago

Wonderful @syumai!

It works flawlessly now. You made the library even better by fixing streaming I see. Just golden!