ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.29k stars 394 forks source link

Swap message after sometime #459

Open dramirezp opened 2 months ago

dramirezp commented 2 months ago

I am using Watermill to develop software where I send a message, and it goes through service1, service2, and the last service. I use a slice to control the order of the messages (FIFO, as GoChannel should respect FIFO). After several runs, I am encountering an issue where Watermill is swapping messages. For example, I send message A and message B, but in the last service, message B arrives first and then message A. Attached is a small script where this problem is reflected. It seems to be a race condition because it doesn't always happen, but when the script is run and the issue occurs, it shows a message like this: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6-45a4-bda6-07e021511d3f.

/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice

// Safe Slice struct just for control of the messages
type SafeSlice struct {
    mu    sync.Mutex
    slice []string
}

func (s *SafeSlice) Append(value string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.slice = append(s.slice, value)
}

func (s *SafeSlice) Get(index int) (string, bool) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return "Index out of scope", false
    }
    return s.slice[index], true
}

func (s *SafeSlice) Remove(index int) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return false
    }
    s.slice = append(s.slice[:index], s.slice[index+1:]...)
    return true
}

// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
    safeSlice.Append(msg.UUID)
    err := pubSub1.Publish("service-2-input", msg)
    if err != nil {
        panic(err)
    }

    return nil
}

// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
    fmt.Printf("Message in service 2 %v\n", msg)

    // Add some logic

    return message.Messages{msg}, nil
}

// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
    uuid, _ := safeSlice.Get(0)

    fmt.Printf("service_last %v\n", msg)

    if msg.UUID == uuid {
        fmt.Println("OK")
        safeSlice.Remove(0)
    } else {
        fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
        os.Exit(0)
    }

    return nil
}

func main() {

    logger := watermill.NewStdLogger(true, true)
    safeSlice = &SafeSlice{}

    pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        log.Fatalf("could not create router: %v", err)
    }

    // Create handlers for each service
    router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
    router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
    router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)

    // Start the router
    go func() {
        if err := router.Run(context.Background()); err != nil {
            log.Fatalf("could not run router: %v", err)
        }
    }()

    time.Sleep(1 * time.Second)

    for {
        // Publish a message to start the pipeline
        msg := message.NewMessage(watermill.NewUUID(), []byte{})
        if err := pubSub1.Publish("service-1-input", msg); err != nil {
            log.Fatalf("could not publish message: %v", err)
        }

        //time.Sleep(1000 * time.Millisecond)
    }

    // Allow some time for the message to be processed
    select {}
}
yashb042 commented 2 weeks ago

The GoChannel approach is using no go-routines internally if you don't explicitly add Multiplier.

I tried running the code and seems like there's some problem indeed.