rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
169 stars 20 forks source link

Heartbeats misleading behaviour #325

Open HustonMmmavr opened 4 months ago

HustonMmmavr commented 4 months ago

Is your feature request related to a problem? Please describe.

Hello!

There is a question about hearbeats, for example the next code would raise to logs next messages

Code:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "github.com/google/uuid"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
    if err != nil {
        fmt.Printf("%s ", err)
        os.Exit(1)
    }
}

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
    defer stop()
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost("localhost").
            SetPort(5552).
            SetUser("guest").
            SetPassword("guest").SetRequestedHeartbeat(time.Second * 1))
    CheckErr(err)

    streamName := uuid.New().String()
    err = env.DeclareStream(streamName,
        &stream.StreamOptions{
            MaxLengthBytes: stream.ByteCapacity{}.MB(500),
        },
    )
    CheckErr(err)

    consName := "consumer_"
    producer, err := env.NewProducer(streamName, nil)
    CheckErr(err)

    go func() {
        for i := 0; i < 1000; i++ {
            err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
            CheckErr(err)
            time.Sleep(5 * time.Second)
        }
    }()

    handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
        err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset())
        if err != nil {
            CheckErr(err)
        }
        fmt.Println(string(message.GetData()))
    }

    cons, err := env.NewConsumer(streamName, handleMessages, stream.
        NewConsumerOptions().
        SetManualCommit().
        SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
    )
    CheckErr(err)
    defer cons.Close()

    <-ctx.Done()
}
go run main.go
hello_world_0
hello_world_1
hello_world_2
hello_world_3
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
hello_world_4
hello_world_5
hello_world_6
hello_world_7
2024/06/24 18:24:55 [warn] - Missing heart beat: 2
2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2
2024/06/24 18:24:55 [warn] - Missing heart beat: 2
2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2
2024/06/24 18:24:55 [error] - Producer BatchSend error during flush: write tcp 127.0.0.1:37404->127.0.0.1:5552: use of closed network connection
producer id: 0  closed exit status 1

Im a bit confused, because the demo rabbit is alive and connection seems too - messages still consuming. Is it a correct behaviour?

Seems the heartBeat uses a hardcoded heartbeat intervals and the heartbeat condition fails

Describe the solution you'd like

Seems hearbeat should be called every interval specified by user, but I may be wrong

Describe alternatives you've considered

No response

Additional context

No response

Gsantomaggio commented 4 months ago

Hi @HustonMmmavr Thank you for testing and reporting the issue. This is a bug. When we set SetRequestedHeartbeat, we should also change the ticket timer. Even we should put a limit; for example, 1 second is too low.