rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
160 stars 13 forks source link

LastConsumed behaviuor seems a little broken #321

Closed HustonMmmavr closed 1 week ago

HustonMmmavr commented 1 week ago

Is your feature request related to a problem? Please describe.

Hello!

I've found that LastConsumed offset would replay the last message at the stream. Here is a repro:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "github.com/google/uuid"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
    if err != nil {
        fmt.Printf("%s ", err)
        os.Exit(1)
    }
}

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
    defer stop()
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost("localhost").
            SetPort(5552).
            SetUser("guest").
            SetPassword("guest").SetRequestedHeartbeat(time.Second * 10))
    CheckErr(err)

    streamName := uuid.New().String()
    err = env.DeclareStream(streamName,
        &stream.StreamOptions{
            MaxLengthBytes: stream.ByteCapacity{}.MB(500),
        },
    )
    CheckErr(err)

    consName := "consumer_"
    producer, err := env.NewProducer(streamName, nil)
    CheckErr(err)

    go func() {
        for i := 0; i < 1; i++ {
            err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
            CheckErr(err)
        }
    }()

    msgChan := make(chan string)

    handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
        err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset())
        if err != nil {
            CheckErr(err)
        }
        fmt.Printf("offset %d\n", consumerContext.Consumer.GetOffset())
        msgChan <- string(message.GetData())
    }

    cons, err := env.NewConsumer(streamName, handleMessages, stream.
        NewConsumerOptions().
        SetManualCommit().
        SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
    )
    CheckErr(err)
    fmt.Println(<-msgChan)
    cons.Close()

    consCopy, err := env.NewConsumer(streamName, handleMessages, stream.
        NewConsumerOptions().
        SetManualCommit().
        SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
    )
    CheckErr(err)
    fmt.Println(<-msgChan)
    defer consCopy.Close()

    <-ctx.Done()
}

The output is next:

offset 0
hello_world_0
offset 0
hello_world_0

Is it an expected behaviour?

Describe the solution you'd like

In my code i have to use Offset specification instead of LastConsumed, and inside the callback check if consumer context offset equals QueryOffset at the start of consumer, than this message was processed and it should be skipped

But is it correct to make someting like this: query offset before starting of consumer, and if offset exists increment it:

offset, err := env.QueryOffset(consName, streamName) 
//handle error
if offsetExists {
    offset++ // is this increment correct
}
consCopy, err := env.NewConsumer(streamName, handleMessages, stream.
    NewConsumerOptions().
    SetManualCommit().
    SetOffset(stream.OffsetSpecification{}.Offset(offset)).SetConsumerName(consName),
)

If this solution is correct, may be it's worth to setup lastOffset + 1 here

Describe alternatives you've considered

No response

Additional context

No response

Gsantomaggio commented 1 week ago

Hi @HustonMmmavr, LastConsumed API was meant to help the user to restart from the last stored message. Over the years, I have seen that it created more confusion than helping. Also the name is misleading.

This is why I decided to deprecate the API, The correct way is what you have done:

offset, err := env.QueryOffset(consName, streamName) 
//handle error
if offsetExists {
    offset++ // is this increment correct
}
HustonMmmavr commented 1 week ago

Got it! Thank you)

Gsantomaggio commented 1 week ago

The method is deprecated now https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/324.

Thank you @HustonMmmavr