segmentio / kafka-go

Kafka library in Go
MIT License
7.52k stars 780 forks source link

Offset is going ahead and we are missing message with lag #1276

Open NitinHsharma opened 6 months ago

NitinHsharma commented 6 months ago

We are using ReadMessage function with Consumer group, which works pretty good. but sometime one of the partition offset is getting set ahead of it commited message/s so those in between messages are getting stuck in kafka. No reader is able to get those message until we are restarting the pods basically forcefully rebalancing the consumer group.

Below are the basic code which we are using to consume the messages

l := log.New(log.Writer(), "kafka reader: ", log.LstdFlags)
Reader = kafka.NewReader(kafka.ReaderConfig{
        Brokers: kafkaDetails.BrokerAddress,
        Topic:   kafkaDetails.Topic,
        Dialer:  readerDialer,
        Logger:  l,
        GroupID: kafkaDetails.GroupID,
    })
msg, err := Reader.ReadMessage(ctx)
    if err != nil {
        logger.Log.Error("could not read message " + err.Error())
        return nil, err
    }
    return msg.Value, nil

Below are the logs for the same

kafka reader: 2024/03/20 08:25:56 stopped heartbeat for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:25:56 stopped commit for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:02 no messages received from kafka within the allocated time for partition 0 of MY_TOPIC_NAME at offset 113: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:23 joined group MY_GROUP_NAME as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101 in generation 35
kafka reader: 2024/03/20 08:26:23 selected as leader for group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 using 'range' balancer to assign group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-g2q2r (github.com/segmentio/kafka-go)-058267e0-bc25-4458-96a2-2deb51ede6d4/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xxqtp (github.com/segmentio/kafka-go)-e2de8250-d309-4e75-b53f-4fe301d9f4be/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-ttfs4 (github.com/segmentio/kafka-go)-993b3b28-30dc-4178-8193-2dc0ae1504d0/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-zdl5b (github.com/segmentio/kafka-go)-2a68df5f-0923-4ef0-8799-04d59f5262db/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xqw56 (github.com/segmentio/kafka-go)-7000ed30-619c-4083-bc3b-5292fbcdc21f/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/0
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/5
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/1
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/4
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/2
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/3
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-g2q2r (github.com/segmentio/kafka-go)-058267e0-bc25-4458-96a2-2deb51ede6d4/MY_TOPIC_NAME/[0]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101/MY_TOPIC_NAME/[5]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-ttfs4 (github.com/segmentio/kafka-go)-993b3b28-30dc-4178-8193-2dc0ae1504d0/MY_TOPIC_NAME/[1]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xqw56 (github.com/segmentio/kafka-go)-7000ed30-619c-4083-bc3b-5292fbcdc21f/MY_TOPIC_NAME/[4]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xxqtp (github.com/segmentio/kafka-go)-e2de8250-d309-4e75-b53f-4fe301d9f4be/MY_TOPIC_NAME/[2]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-zdl5b (github.com/segmentio/kafka-go)-2a68df5f-0923-4ef0-8799-04d59f5262db/MY_TOPIC_NAME/[3]
kafka reader: 2024/03/20 08:26:23 joinGroup succeeded for response, MY_GROUP_NAME.  generationID=35, memberID=MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101
kafka reader: 2024/03/20 08:26:23 Joined group MY_GROUP_NAME as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101 in generation 35
kafka reader: 2024/03/20 08:26:23 Syncing 6 assignments for generation 35 as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101
kafka reader: 2024/03/20 08:26:23 sync group finished for group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 started heartbeat for group, MY_GROUP_NAME [3s]
kafka reader: 2024/03/20 08:26:23 subscribed to topics and partitions: map[{topic:MY_TOPIC_NAME partition:5}:78]
kafka reader: 2024/03/20 08:26:23 initializing kafka reader for partition 5 of MY_TOPIC_NAME starting at offset 78
kafka reader: 2024/03/20 08:26:23 started commit for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 the kafka reader for partition 5 of MY_TOPIC_NAME is seeking to offset 78
kafka reader: 2024/03/20 08:26:32 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:41 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:50 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:59 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:08 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:17 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:23 committed offsets for group MY_GROUP_NAME: 
    topic: MY_TOPIC_NAME
        partition 5: 79
kafka reader: 2024/03/20 08:27:32 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:41 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:50 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:59 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:08 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:17 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:26 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:35 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:46 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:55 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:04 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:13 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:22 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:31 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:40 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:49 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:58 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:07 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:16 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:25 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:

Now if you see the logs at the end, it library has commited 79 offset on partition no 5 but somehow it moved to 80. which is causing this lag at kafka with 1 message.

ahmedyusef9 commented 6 months ago

Hi any suggested solution for now?

ahmedyusef9 commented 6 months ago

actually for now, fetch and commit manually

go func() {
        for {

            msg, err := ki.Reader.FetchMessage(context.Background())
            if err != nil {
                vlog.Errorf("Error reading message: %v", err)
                continue
            }
            // TO-DO

            // Commit the message after processing
            if err := ki.Reader.CommitMessages(context.Background(), msg); err != nil {
                vlog.Errorf("Error committing message: %v", err)
            }
        }
    }()
shcw commented 4 months ago

I am also experiencing the same issue and I am unaware of its cause

nachogiljaldo commented 3 months ago

@NitinHsharma do you have a reproducer? or is there any factor you saw that causes this to happen more often?

NitinHsharma commented 1 month ago

@nachogiljaldo No it is random. and one more observation i saw today is if i have less consumer pods than partition then single consumer pod is taking multiple partition to read. But it is only consuming single partition continosualy since there is continuoes traffic on the kafka topic. So my 1 partition lag is getting increase till i forcefully add one more consumer pod. Example i have topic with 10 partition and have 9 consumer pods so any one random consumer pod let's say consumer pod number 8 is taking 2 partition with it but reading from only 1 parition. My expectation and understanding is that it should read from both in round robin manner to distribute the load.

nachogiljaldo commented 3 weeks ago

Just for confirmation, do you think this could be potentially related to rebalances? (i.e. there is a rebalance with a pending async commit that sets it to an offset older than the one you had?), something like this: https://github.com/segmentio/kafka-go/issues/1308

NitinHsharma commented 2 weeks ago

Yes it could be