rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
160 stars 13 forks source link

consumer offsets a bit unclear #326

Open HustonMmmavr opened 4 days ago

HustonMmmavr commented 4 days ago

Is your feature request related to a problem? Please describe.

Hello!

In my project I've to manually track offsets. It's done in a different (main) goroutine, while the consumer callback is called in it's own goroutine, so due to async nature i can't rely on consumer.Offset tracking outside of consumer routine, so I have to do something like that:

package main

import (
    "context"
    "errors"
    "fmt"
    "os"
    "os/signal"
    "strconv"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
    if err != nil {
        fmt.Printf("%s ", err)
        os.Exit(1)
    }
}

type offsetMsg struct {
    offt int
    msg  *amqp.Message
}

func newHandler(acceptChan chan offsetMsg) func(stream.ConsumerContext, *amqp.Message) {
    return func(consumerContext stream.ConsumerContext, message *amqp.Message) {
        // fmt.Println(string(message.GetData()), message.DeliveryTag, consumerContext.Consumer.GetOffset())
        acceptChan <- offsetMsg{offt: int(consumerContext.Consumer.GetOffset()), msg: message}
    }
}

func storeOffset(c *stream.Consumer, offt int64) error {
    err := c.StoreCustomOffset(offt)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("offset stored", offt)
    }
    return err
}

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
    defer stop()
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost("localhost").
            SetPort(5552).
            SetUser("guest").
            SetPassword("guest"))
    CheckErr(err)

    streamName := "stream"
    err = env.DeclareStream(streamName,
        &stream.StreamOptions{
            MaxLengthBytes: stream.ByteCapacity{}.MB(500),
        },
    )
    CheckErr(err)

    consName := "consumer_"
    producer, err := env.NewProducer(streamName, nil)
    CheckErr(err)

    go func() {
        for i := 0; i < 5; i++ {
            err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
            CheckErr(err)
        }
    }()

    offt, err := env.QueryOffset(consName, streamName)
    if err != nil {
        if !errors.Is(err, stream.OffsetNotFoundError) {
            panic(err)
        }
    } else {
        offt++
    }
    fmt.Println("queried offset")

    acceptCh := make(chan offsetMsg) //*amqp.Message)
    handler := newHandler(acceptCh)

    cons, err := env.NewConsumer(streamName, handler, stream.
        NewConsumerOptions().
        SetManualCommit().
        SetOffset(stream.OffsetSpecification{}.Offset(offt)).SetConsumerName(consName),
    )

    go func() {
        counter := 0
        for {
            select {
            case <-ctx.Done():
                return
            case data := <-acceptCh:
                counter++
                fmt.Printf("data: %s, offset: %d, deliveryTag: %v\n", string(data.msg.GetData()), data.offt, data.msg.DeliveryTag)
                if counter%5 == 0 {
                    storeOffset(cons, int64(data.offt))
                }
            }
        }
    }()

    CheckErr(err)
    defer cons.Close()

    <-ctx.Done()
}

The output is next:

queried offset
data: hello_world_0, offset: 0, deliveryTag: []
data: hello_world_1, offset: 1, deliveryTag: []
data: hello_world_2, offset: 2, deliveryTag: []
data: hello_world_3, offset: 3, deliveryTag: []
data: hello_world_4, offset: 4, deliveryTag: []
offset stored 4

Also, as I saw in library - the consumer routine works with array of offsetMessages, where offsetMessage - pointer to *amqp.Message and offset. As a result there are two unnecessary wraps. Maybe it's worth to send offsetMessage to the callback, or extend the amqp.Message with the offset field (like it's done inside amqp lib DeliveryTag field).

Here is an amqp-code:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    amqp "github.com/rabbitmq/amqp091-go"
)

func CheckErr(err error) {
    if err != nil {
        fmt.Printf("%s ", err)
        os.Exit(1)
    }
}

func main() {
    ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
    defer done()
    produce("amqp://localhost:5672", "exch", "fanout", "key")
    c := NewConsumer("amqp://localhost:5672", "exch", "fanout", "q", "key", "tag")
    SetupCloseHandler(c)
    <-ctx.Done()
}

func produce(uri, exchange, exchangeType, routingKey string) {
    connection, err := amqp.Dial(uri)
    CheckErr(err)
    defer connection.Close()

    channel, err := connection.Channel()
    CheckErr(err)
    err = channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil)
    CheckErr(err)

    for i := 0; i < 5; i++ {
        body := []byte(fmt.Sprintf("body_%d", i))
        err := channel.Publish(
            exchange,   // publish to an exchange
            routingKey, // routing to 0 or more queues
            false,      // mandatory
            false,      // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
                DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent
                Priority:        0,              // 0-9
                // a bunch of application/implementation-specific fields
            },
        )
        CheckErr(err)
    }
}

type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    tag     string
    done    chan error
}

func SetupCloseHandler(consumer *Consumer) {
    c := make(chan os.Signal, 2)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-c
        fmt.Printf("Ctrl+C pressed in Terminal")
        consumer.channel.Close()
        consumer.conn.Close()
        fmt.Println(<-consumer.done)
        os.Exit(0)
    }()
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) *Consumer {
    c := &Consumer{
        conn: nil, channel: nil, tag: ctag, done: make(chan error),
    }

    var err error
    config := amqp.Config{Properties: amqp.NewConnectionProperties()}
    config.Properties.SetClientConnectionName("sample-consumer")

    c.conn, err = amqp.DialConfig(amqpURI, config)
    CheckErr(err)

    go func() {
        fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
    }()

    c.channel, err = c.conn.Channel()
    CheckErr(err)

    err = c.channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil)
    CheckErr(err)

    queue, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
    CheckErr(err)

    err = c.channel.QueueBind(queue.Name, key, exchange, false, nil)
    CheckErr(err)

    deliveries, err := c.channel.Consume(queue.Name, c.tag, false, false, false, false, nil)
    CheckErr(err)

    go handle(deliveries, c.done)

    return c
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    cleanup := func() {
        fmt.Printf("handle: deliveries channel closed")
        done <- nil
    }

    defer cleanup()

    for d := range deliveries {
        fmt.Printf("data: %s, deliveryTag: %d\n", string(d.Body), d.DeliveryTag)
        CheckErr(d.Ack(false))
    }
}

It's output is next:

data: body_0, deliveryTag: 1
data: body_1, deliveryTag: 2
data: body_2, deliveryTag: 3
data: body_3, deliveryTag: 4
data: body_4, deliveryTag: 5

Describe the solution you'd like

Maybe it's worth to make api of amqp Messages similar to amqp library or extend current amqp message with new Offset field. And the second proposal: maybe it's worth to add the ability for user to interact with channel based approach, like it's done inside amqp lib (for ex create new API method that will return consumer channel to user application)

Describe alternatives you've considered

No response

Additional context

No response

Gsantomaggio commented 3 days ago

hi @HustonMmmavr , The offset is outside the message context, so this information cannot be added to the message. The other clients have the same behaviour.

Having a channel on the consumer side with all the information is correct, as you have done.

maybe it's worth to add the ability for user to interact with channel based approach

That was the intention for the version 2.x of this client, but at the moment, the implementation is blocked due to other priorities.

You could propose a PR if you like, but it can be unclear whether the user has two ways to consume the messages.