bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 221 forks source link

consumer keeps shutting down itself because it's request offset is always greater than server's offset #204

Closed imjustfly closed 6 years ago

imjustfly commented 6 years ago

I find a consumer issue when debugging my own producer using sarama low-level api: the consumer keeps shutting down itself because it's request offset is always greater than server's offset.

You can find the context here:

https://github.com/Shopify/sarama/issues/1013

If OffsetDelta of record is set, the consumer's offset will be incorrect.

Is there anything wrong? It's really weird.

dim commented 6 years ago

@imjustfly sorry, I am not really following. I would need to see the consumer code in order to help you. are you ever marking the offset of any consumed message? the The requested offset is outside the range of offsets maintained by the server for the given topic/partition. is usually due to the server expiring messages too quickly, do you have any TTL/expiration set for the topic?

imjustfly commented 6 years ago

Yes, every consumed message's offset is marked. The consumer code is as simple as the example code, and I didn't set the TTL of topic.

The situation is the consumer keeps reporting this error from the start, though messages are newly produced.And from the kafka manager, the consumer offset is aways equal to log size + 1.

I believe this is related to the sarama's broker produce request. If OffsetDelta of Record is set,this error will happen. Old sarama producer don't set the value, but a newly merged PR (https://github.com/Shopify/sarama/pull/1015) does . I don't know if this will make more people get this error.

I think you can reproduce this error by producing a batch of messages using this code(sarama's owner says this code is correct ):

func produce(msgs [][]byte) error {
    batch := &sarama.RecordBatch{
        FirstTimestamp: time.Now(), // not precise
        Version:        2,          // kafka version > 0.11, batch version is 2
        ProducerID:     -1,         // No producer id
        Codec:          sarama.CompressionNone,
        Records:        make([]*sarama.Record, 0, len(msgs)),
    }
    for i, msg := range msgs {
        batch.Records = append(batch.Records, &sarama.Record{
            Value: msg.Value,
                        OffsetDelta: int(i),
        })
    }
    request := &sarama.ProduceRequest{
        RequiredAcks: sarama.WaitForAll, // all
        Timeout:      10 * 1000,
        Version:      3, // kafka version > 0.11, request version is 3
    }
    request.AddBatch(topic, partition, batch)

    response, err := broker.Produce(request)
    if err != nil {
        return err
    }
    block := response.GetBlock(topic, partition)
    if block == nil {
        return fmt.Errorf("result for current partition not received")
    }
    if block.Err != sarama.ErrNoError {
        return block.Err
    }
    return nil
}

Be sure your kafka version is greater than 0.11. Then start a consumer for current topic, you will get this error.

Thanks for your response!

dim commented 6 years ago

@imjustfly it looks like a sarama issue to me, but I will add a test on my side to validate the assumption

dim commented 6 years ago

I am not 100% sure about the purpose of records (not had time yet to look into it), neither am I aware of the right use of OffsetDelta, but I added a simple pure-sarama script:

func run(addrs []string, topic string) error {
    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_0
    config.Consumer.Return.Errors = true

    client, err := sarama.NewClient(addrs, config)
    if err != nil {
        return err
    }
    defer client.Close()

    broker, err := client.Leader(topic, 0)
    if err != nil {
        return err
    }
    defer broker.Close()

    req := &sarama.ProduceRequest{
        RequiredAcks: sarama.WaitForAll,
        Timeout:      10 * 1000,
        Version:      3,
    }
    req.AddBatch(topic, 0, &sarama.RecordBatch{
        FirstTimestamp: time.Now(),
        Version:        2,
        ProducerID:     -1,
        Codec:          sarama.CompressionNone,
        Records: []*sarama.Record{
            {OffsetDelta: 0, Value: []byte("DATA-0001")},
            {OffsetDelta: 1, Value: []byte("DATA-0002")},
        },
    })

    if _, err := broker.Produce(req); err != nil {
        return err
    }

    consumer, err := sarama.NewConsumerFromClient(client)
    if err != nil {
        return err
    }
    defer consumer.Close()

    pc, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
    if err != nil {
        return err
    }
    defer pc.Close()

    go func() {
        for err := range pc.Errors() {
            fmt.Println("ERR", err)
        }
    }()

    for msg := range pc.Messages() {
        fmt.Printf("MSG: %s-%d/%d %q\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
    }
    return nil
}

func main() {
    if err := run([]string{"127.0.0.1:9092"}, "topic-x"); err != nil {
        fmt.Println("FATAL", err)
    }
}

All I get is the first message, then the script hangs waiting for the second:

MSG: topic-x-0/1 "DATA-0001"

If I increment the OffsetDelta I get the same error as with the cluster wrapper. As said, I am not 100% clear on the internals of Kafka in this case, but all of this happens on the lower level.

imjustfly commented 6 years ago

Thanks for your response. But using your code, I don't get any hanging problem.

And If producing with your code above and consuming with the sarama-cluster-cli,this error will occur:

~|⇒ ./sarama-cluster-cli -group=appier -brokers=192.168.6.151:9092 -topics=topic-x -verbose=true
2018/01/03 17:13:21 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2018/01/03 17:13:21 Initializing new client
2018/01/03 17:13:21 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2018/01/03 17:13:21 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2018/01/03 17:13:21 client/metadata fetching metadata for all topics from broker 192.168.6.151:9092
2018/01/03 17:13:21 Connected to broker at 192.168.6.151:9092 (unregistered)
2018/01/03 17:13:22 client/brokers registered new broker #2 at 192.168.6.151:9094
2018/01/03 17:13:22 client/brokers registered new broker #1 at 192.168.6.151:9093
2018/01/03 17:13:22 client/brokers registered new broker #0 at 192.168.6.151:9092
2018/01/03 17:13:22 Successfully initialized new client
2018/01/03 17:13:22 client/coordinator requesting coordinator for consumergroup appier from 192.168.6.151:9092
2018/01/03 17:13:22 client/coordinator coordinator for consumergroup appier is #0 (192.168.6.151:9092)
2018/01/03 17:13:22 client/metadata fetching metadata for all topics from broker 192.168.6.151:9092
2018/01/03 17:13:22 client/coordinator requesting coordinator for consumergroup appier from 192.168.6.151:9092
2018/01/03 17:13:22 client/coordinator coordinator for consumergroup appier is #0 (192.168.6.151:9092)
2018/01/03 17:13:22 cluster/consumer  rebalance
2018/01/03 17:13:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2018/01/03 17:13:22 Connected to broker at 192.168.6.151:9092 (registered as #0)
2018/01/03 17:13:22 cluster/consumer sarama-b5f208fd-a7a9-4aec-a104-66eeaec1e052 consume topic-x/0 from -1
2018/01/03 17:13:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2018/01/03 17:13:22 Connected to broker at 192.168.6.151:9093 (registered as #1)
2018/01/03 17:13:22 consumer/broker/1 added subscription to topic-x/0
topic-x/0/1 DATA-0001
topic-x/0/2 DATA-0002
2018/01/03 17:13:27 kafka: error while consuming topic-x/0: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
2018/01/03 17:13:27 consumer/topic-x/0 shutting down because kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
2018/01/03 17:13:27 client/metadata fetching metadata for all topics from broker 192.168.6.151:9092
2018/01/03 17:13:27 client/coordinator requesting coordinator for consumergroup appier from 192.168.6.151:9092
2018/01/03 17:13:27 client/coordinator coordinator for consumergroup appier is #0 (192.168.6.151:9092)
2018/01/03 17:13:27 cluster/consumer sarama-b5f208fd-a7a9-4aec-a104-66eeaec1e052 rebalance
2018/01/03 17:13:27 cluster/consumer sarama-b5f208fd-a7a9-4aec-a104-66eeaec1e052 consume topic-x/0 from 3
2018/01/03 17:13:27 consumer/broker/1 added subscription to topic-x/0
^C2018/01/03 17:13:36 consumer/broker/1 closed dead subscription to topic-x/0
2018/01/03 17:13:36 Closing Client

I'am using the master branch of sarama and sarama cluster, kafka server is 0.11.

dim commented 6 years ago

You shouldn't need to use sarama-cluster for my code at all, I can see this though:

2018/01/03 17:13:27 kafka: error while consuming topic-x/0: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
2018/01/03 17:13:27 consumer/topic-x/0 shutting down because kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.

Let me GIST something ...

imjustfly commented 6 years ago

The key question is:

I want to produce message like this:

package main

import (
    "fmt"
    "time"

    "github.com/Shopify/sarama"
)

func run(addrs []string, topic string) error {
    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_0
    config.Consumer.Return.Errors = true

    client, err := sarama.NewClient(addrs, config)
    if err != nil {
        return err
    }
    defer client.Close()

    broker, err := client.Leader(topic, 0)
    if err != nil {
        return err
    }
    defer broker.Close()

    req := &sarama.ProduceRequest{
        RequiredAcks: sarama.WaitForAll,
        Timeout:      10 * 1000,
        Version:      3,
    }
    req.AddBatch(topic, 0, &sarama.RecordBatch{
        FirstTimestamp: time.Now(),
        Version:        2,
        ProducerID:     -1,
        Codec:          sarama.CompressionNone,
        Records: []*sarama.Record{
            {OffsetDelta: 0, Value: []byte("DATA-0001")},
            {OffsetDelta: 1, Value: []byte("DATA-0002")},
        },
    })

    if _, err := broker.Produce(req); err != nil {
        return err
    }

    return nil
}

func main() {
    if err := run([]string{"192.168.6.151:9092"}, "topic-x"); err != nil {
        fmt.Println("FATAL", err)
    }
}

and I want to consume messages by sarama cluster:

~|⇒ ./sarama-cluster-cli -group=appier -brokers=192.168.6.151:9092 -topics=topic-x -verbose=true

Now, sarama cluster reports an offset error, who should I report this issue to? I have no idea.

imjustfly commented 6 years ago

And I find something:

If OffsetDelta of Record is set, topic offset starts from 1. But if not set, topic offset starts from 0.

OffsetDelta set:

~|⇒ go run test.go
MSG: topic-x-0/0 "DATA-0001"
MSG: topic-x-0/1 "DATA-0002"

OffsetDelta not set:

~|⇒ go run test.go
MSG: topic-x-0/1 "DATA-0001"
MSG: topic-x-0/2 "DATA-0002"
dim commented 6 years ago

@imjustfly I understand your intention, but am trying to isolate the problem by not using consumer-groups and using simple consumers instead. This way, I can demonstrate that his is something that needs to be addressed on sarama side.

Here, I have created a simple script: https://gist.github.com/dim/c3a51cde173d37dd496587ba07493a88. I am using the latest sarama master (see revision in Gopkg.toml) and Kafka 1.0.0.

Here's my output (I added some comments below):

2018/01/03 10:13:05 Initializing new client
2018/01/03 10:13:05 client/metadata fetching metadata for all topics from broker 127.0.0.1:29092
2018/01/03 10:13:05 Connected to broker at 127.0.0.1:29092 (unregistered)
2018/01/03 10:13:05 client/brokers registered new broker #0 at myhost:29092
2018/01/03 10:13:05 Successfully initialized new client
2018/01/03 10:13:05 client/metadata fetching metadata for [topic-1514974385] from broker 127.0.0.1:29092
2018/01/03 10:13:05 Connected to broker at myhost:29092 (registered as #0)

# Consume all messages since oldest. This works, all messages are retrieved.
2018/01/03 10:13:05 CONSUME 3 messages from -2
2018/01/03 10:13:05 consumer/broker/0 added subscription to topic-1514974385/0
2018/01/03 10:13:05 MESSAGE 2 "DATA-0000"
2018/01/03 10:13:05 MESSAGE 3 "DATA-0001"
2018/01/03 10:13:05 MESSAGE 4 "DATA-0002"
2018/01/03 10:13:05 consumer/broker/0 closed dead subscription to topic-1514974385/0

# Now, let's consume 1 message since offset 2 (= the first message). Works!
2018/01/03 10:13:05 CONSUME 1 messages from 2
2018/01/03 10:13:05 consumer/broker/0 added subscription to topic-1514974385/0
2018/01/03 10:13:05 MESSAGE 2 "DATA-0000"
2018/01/03 10:13:06 consumer/broker/0 closed dead subscription to topic-1514974385/0

# Now, let's consume 2 message since offset 2 (= the 2nd and the 3rd message). Timeout!
2018/01/03 10:13:06 CONSUME 2 messages from 3
2018/01/03 10:13:06 consumer/broker/0 added subscription to topic-1514974385/0
2018/01/03 10:13:11 consumer/broker/0 closed dead subscription to topic-1514974385/0
2018/01/03 10:13:11 ERROR timeout

# Finally, let's consume 1 message since offset 4 (= the last message). Error!
2018/01/03 10:13:11 CONSUME 1 messages from 4
2018/01/03 10:13:11 ERROR kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.

2018/01/03 10:13:11 Closed connection to broker myhost:29092
2018/01/03 10:13:11 Closing Client
imjustfly commented 6 years ago

here is my output:

2018/01/03 18:35:27 Initializing new client
2018/01/03 18:35:27 client/metadata fetching metadata for all topics from broker 192.168.6.151:9092
2018/01/03 18:35:27 Connected to broker at 192.168.6.151:9092 (unregistered)
2018/01/03 18:35:27 client/brokers registered new broker #2 at 192.168.6.151:9094
2018/01/03 18:35:27 client/brokers registered new broker #1 at 192.168.6.151:9093
2018/01/03 18:35:27 client/brokers registered new broker #0 at 192.168.6.151:9092
2018/01/03 18:35:27 Successfully initialized new client
2018/01/03 18:35:27 Connected to broker at 192.168.6.151:9092 (registered as #0)
2018/01/03 18:35:27 CONSUME 3 messages from -2
2018/01/03 18:35:27 consumer/broker/0 added subscription to topic-h/0
2018/01/03 18:35:27 MESSAGE 2 "DATA-0000"
2018/01/03 18:35:27 MESSAGE 3 "DATA-0001"
2018/01/03 18:35:27 MESSAGE 4 "DATA-0002"
2018/01/03 18:35:28 consumer/broker/0 closed dead subscription to topic-h/0
2018/01/03 18:35:28 CONSUME 1 messages from 2
2018/01/03 18:35:28 consumer/broker/0 added subscription to topic-h/0
2018/01/03 18:35:28 MESSAGE 2 "DATA-0000"
2018/01/03 18:35:28 consumer/broker/0 closed dead subscription to topic-h/0
2018/01/03 18:35:28 CONSUME 2 messages from 3
2018/01/03 18:35:28 consumer/broker/0 added subscription to topic-h/0
2018/01/03 18:35:33 consumer/broker/0 closed dead subscription to topic-h/0
2018/01/03 18:35:33 ERROR timeout
2018/01/03 18:35:33 CONSUME 1 messages from 4
2018/01/03 18:35:33 ERROR kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
2018/01/03 18:35:33 Closed connection to broker 192.168.6.151:9092
2018/01/03 18:35:33 Closing Client
imjustfly commented 6 years ago

I think I find the reason.

If OffsetDelta is set and the offset is n, messages are returned from offset n+1. But if OffsetDelta is not set, the messages are returned from offset n.