cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
835 stars 220 forks source link

Race condition when sending messages with the MQTT protocol #1093

Closed yanmxa closed 2 months ago

yanmxa commented 2 months ago

I launched 10 goroutines to send messages using the MQTT protocol. After a while, it might throw an error like this:

Code to reproduce the issue:

    for i := 0; i < count; i++ {
        go func() {
            for {
                e := cloudevents.NewEvent()
                e.SetID(uuid.New().String())
                e.SetType("com.cloudevents.sample.sent")
                e.SetSource("https://github.com/cloudevents/sdk-go/samples/mqtt/sender")
                err = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
                    "id":      rand.Intn(100), // generates a random int between 0 and 99
                    "message": "Hello, World!",
                })
                if err != nil {
                    log.Printf("failed to set data: %v", err)
                }
                if result := c.Send(
                    cecontext.WithTopic(ctx, "test-topic"),
                    e,
                ); cloudevents.IsUndelivered(result) {
                    log.Printf("failed to send: %v", result)
                } else {
                    log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
                }
                time.Sleep(time.Second)
            }
        }()
        time.Sleep(1 * time.Second)
    }
    <-ctx.Done()

Error message:

panic: runtime error: growslice: len out of range [recovered]
        panic: bytes.Buffer: too large

goroutine 9 [running]:
bytes.growSlice.func1()
        /usr/local/go/src/bytes/buffer.go:232 +0x45
panic({0x725280?, 0x813980?})
        /usr/local/go/src/runtime/panic.go:914 +0x21f
bytes.growSlice({0xc000500bc0, 0x16, 0x44df49?}, 0x40?)
        /usr/local/go/src/bytes/buffer.go:249 +0x8e
bytes.(*Buffer).grow(0xc0007119c8, 0x101000000000000)
        /usr/local/go/src/bytes/buffer.go:151 +0x13d
bytes.(*Buffer).WriteString(0xc0007119c8, {0x0, 0x101000000000000})
        /usr/local/go/src/bytes/buffer.go:191 +0x59
github.com/eclipse/paho.golang/packets.writeString({0x0, 0x101000000000000}, 0x4f66ab?)
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/packets/packets.go:437 +0x3c
github.com/eclipse/paho.golang/packets.(*Properties).Pack(0xc000724780, 0x3)
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/packets/properties.go:388 +0x865
github.com/eclipse/paho.golang/packets.(*Publish).Buffers(0xc000090880)
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/packets/publish.go:70 +0x7b
github.com/eclipse/paho.golang/packets.(*ControlPacket).WriteTo(0xc000711b60, {0x814ac0, 0xc000182238})
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/packets/packets.go:317 +0x84
github.com/eclipse/paho.golang/packets.(*Publish).WriteTo(0xc0001c6580?, {0x814ac0?, 0xc000182238?})
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/packets/publish.go:78 +0x6a
github.com/eclipse/paho.golang/paho.(*Client).Publish(0xc0001ea140, {0x818708, 0xc0004370b0}, 0xc0001c6580)
        /home/myan/go/pkg/mod/github.com/eclipse/paho.golang@v0.12.0/paho/client.go:797 +0x2bb
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2.(*Protocol).Send(0xc0001c6600, {0x818708, 0xc0004370b0}, {0x8189a8?, 0xc000480a00}, {0x0, 0x0, 0x0})
        /home/myan/workspace/cloudevents-sdk-go/protocol/mqtt_paho/v2/protocol.go:105 +0x270
github.com/cloudevents/sdk-go/v2/client.(*ceClient).Send(0xc0000a6000, {0x818708, 0xc0004370b0}, {{0x81cb20, 0xc000659180}, {0xc00049e510, 0x22, 0x30}, 0x0, 0x0})
        /home/myan/workspace/cloudevents-sdk-go/v2/client/client.go:138 +0x2dc
main.main.func1()
        /home/myan/workspace/cloudevents-sdk-go/samples/mqtt/sender/main.go:68 +0x411
created by main.main in goroutine 1
        /home/myan/workspace/cloudevents-sdk-go/samples/mqtt/sender/main.go:55 +0x445
exit status 2

The root cause is that these goroutines use the same object to load events. To resolve this, we must avoid using the same object in multiple goroutines.

embano1 commented 2 months ago

Which version of the SDK are you using? Just checking because line 105 that you are probably referring to as the root cause in your stack above is currently returning an error: https://github.com/cloudevents/sdk-go/blob/1aecb204b50da7dae3d5586030c7047479334bdc/protocol/mqtt_paho/v2/protocol.go#L105

embano1 commented 2 months ago

Have you tried the same with the paho.golang@v0.12.0/paho/client.go directly to verify it's our SDK causing the race?

yanmxa commented 2 months ago

Yes. I use the paho.golang@v0.12.0 sdk directly to do the same thing. This issue hasn't appeared.