IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.21k stars 1.73k forks source link

Topic retention settings causes consumer to skip consuming new messages that arrived after deleted messages #2898

Closed bentcoder closed 1 month ago

bentcoder commented 1 month ago
Description

Hi,

If I write in order, it would probably be clearer so here it is:

  1. In the producer app, I set message retention as 10000 ms at topic creation and run it. The consumer app is not running yet.
  2. Producer app sends 3 some messages to the topic.
  3. I let 5 minutes go by so that the messages are auto removed from the topic. Topic has 0 messages now which is good.
  4. I produce 3 messages again. Topic has 3 messages now.
  5. I run consumer app straight after but it doesn't see/consume (I can see them UI) those pending messages! WHY?
  6. I produce 3 messages again. Topic has 6 messages now.
  7. Because consumer is already running, it consumes these 3 new messages!

So technically what's happening here is that, when messages are deleted from topic due to retention settings, consumer will not see new messages that were added after deleted messages. This happens if I run consumer after adding new messages. If I run it in the middle and send messages, it would consume them.

This issue doesn't exists if I don't set those two retention entries at ConfigEntries.

Versions
Sarama Kafka Go
v1.43.2 v3.3.2 1.22
Configuration

Producer app

Create topic.

ms := "10000"

err := adm.CreateTopic("any-day", &sarama.TopicDetail{
    NumPartitions:     1,
    ReplicationFactor: 1,
    ConfigEntries: map[string]*string{
        "retention.ms":        &ms,
        "delete.retention.ms": &ms,
    },
}, false)

Send message.

msg := sarama.ProducerMessage{
    Topic:     "any-day",
    Value:     sarama.StringEncoder(data),
    Key:       sarama.StringEncoder(item.ID),
    Timestamp: time.Now().UTC(),
}

part, offs, err := syncProducer.SendMessage(&msg)

Consumer app

type Kafka struct { // ... }

func (k Kafka) CreateConsumerGroup(topic string) error {
    grp, err := sarama.NewConsumerGroup(k.addrs, topic+"-group", k.config)
    if err != nil {
        return err
    }

    go func() {
        for err := range grp.Errors() {
            fmt.Println("ERROR", err)
        }
    }()

    cons := AnyDay{}

    for {
        err := grp.Consume(context.Background(), []string{topic}, cons)
        if err != nil {
            fmt.Printf("error from consumer: %v", err)
        }
    }
}

type AnyDay struct{}

func (AnyDay) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (AnyDay) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (AnyDay) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    i := 1
    for msg := range claim.Messages() {
        par := msg.Partition
        off := msg.Offset
        top := msg.Topic
        key := msg.Key

        fmt.Printf("%d: PAR: %d OFF: %d TOP: %s KEY: %s > %s\n", i, par, off, top, key, string(msg.Value))

        sess.MarkMessage(msg, "")

        i++
    }

    return nil
}
puellanivis commented 1 month ago

This sort of looks like what I would expect if you had Config.Offsets.Initial == OffsetNewest. When the consumer joins the topic, it would then be waiting for newer messages, and not from the start. Relevant comments on the field from godoc:

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64
bentcoder commented 1 month ago

I actually changed it to OffsetOldest and it has been working fine since but I wasn't sure if this was, in fact, the solution. Hence I wanted to wait for a comment.

puellanivis commented 1 month ago

Great news. :) Good to get feedback that what the thing that first comes to mind was the issue. 👍

dnwe commented 1 month ago

Yep, the described behaviour does match with what you'd expect a consumer to do if it starts as a new group without any committed offset(s) and a config setting of sarama.OffsetNewest

bentcoder commented 1 month ago

Excellent, thank you!