lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start #441

Closed homier closed 10 months ago

homier commented 10 months ago

Hi everyone,

We've started experiencing an issue leading to reprocessing topic messages from start, when we changed Sarama's initial offset policy from OffsetNewest to OffsetOldest. First, we thought that the issue itself is about replacing one policy with another, though it appears that the problem is only about OffsetOldest policy.

I wrote a simple script that consumes messages from topic with OffsetOldest policy and prints it to stdout (attached to the issue). Produced 10 messages, started the worker, and when all the messages were successfully consumed, the consumer committed the latest message offset to consumer group - the behavior I expected. When you stop the consumer with Ctrl-C and restart it immediately, the consumer starts reading messages from the latest offset in group, everything is fine.

But, if you wait for several minutes before starting the consumer again, it will lead to resetting an offset to initial (-2 in case of Sarama) and reprocessing all the messages again.

In our case, it's a really critical issue, since with the reprocessing we have to consume several millions of messages, and it takes several hours to finish due to some computations we should do for every single message.

Seems like the issue relates to this one: https://github.com/IBM/sarama/issues/2036 And one more question - should I open the same issue to Sarama?

Golang version: go version go1.21.1 linux/amd64 Sarama's version: github.com/IBM/sarama v1.41.2 Goka's version: github.com/lovoo/goka v1.1.9

package main

import (
    "context"
    "fmt"

    "github.com/IBM/sarama"
    "github.com/lovoo/goka"
)

type Codec struct{}

func (Codec) Decode(data []byte) (any, error) {
    return string(data), nil
}
func (Codec) Encode(data any) ([]byte, error) {
    return []byte(data.(string)), nil
}

func main() {
    var codec Codec

    group := goka.DefineGroup("goka-test", goka.Input(goka.Stream("goka-test"), codec, handle))

    conf := goka.DefaultConfig()

    // Everything is fine when using `sarama.OffsetNewest`
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest

    proc, err := goka.NewProcessor(
        []string{"localhost:9092"}, group,
        goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(conf)),
        goka.WithLogger(goka.DefaultLogger()),
    )

    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    errCh := make(chan error, 1)
    go func() {
        defer close(errCh)
        errCh <- proc.Run(ctx)
    }()

    select {
    case err := <-errCh:
        if err != nil {
            panic(err)
        }
    case <-ctx.Done():
    }
}

func handle(ctx goka.Context, data any) {
    fmt.Println(data)
}
frairon commented 10 months ago

Hi @homier,

to be honest I can't reproduce the behavior at all. It's never reconsumed, however long I waited. My guess is it's a configuration issue on kafka's consumer-offsets topic. So not related to goka or sarama. I tried it with the kafka-cluster set up by docker-compose in the examples-folder, everything worked as expected.

Did you use the same cluster? If not and you have access to kafka-tools in your cluster, maybe check the stored offsets for the test-group in the cluster. Maybe they're reset for some reason after some time?

kafka-consumer-groups --bootstrap-server <broker> --group goka-test --describe --offsets

homier commented 10 months ago

Hi @frairon ,

Thanks for the answer! Last week we were trying to figure out what's going on in Kafka when this issue happens.

It appeared that a consumer group was deleted by Kafka after 10 minutes or so, when a processor was shut down. We've tried different Kafka versions until we realized the problem was inside Sarama group configuration.

The issue is fixed by upgrading Sarama to v1.42.1

frairon commented 10 months ago

Alright, that's odd :) But good to know, thanks for the feedback!