bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 222 forks source link

commitOffset does not work properly in partition mode #294

Closed carl-leopard closed 5 years ago

carl-leopard commented 5 years ago

kafka: 1.1.0 zookeeper: 3.4.12

i call MarkOffset after handling message success, then call CommitOffset to manual commit offset, but it does not work as expected. here is the code:

func (p *KafkaConsumer) consumeByPartitionedModeAndCommit(handler func(message interface{}) error) {
    // consume partitions
    for {
        select {
        case part, ok := <-p.consumer.Partitions():
            {
                if !ok {
                    log.Errorf("%s consumer Error. part:%v, ok:%v", part, ok)
                    continue
                }

                // start a separate goroutine to consume messages
                go func(pc cluster.PartitionConsumer) {
                    for msg := range pc.Messages() {
                        log.Infof("%s comsume message. topic:%s, partition:%d, offset:%d, key:%s, val:%s", prefix, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                        if handler != nil {
                            if err := handler(*msg); err == nil {
                                p.consumer.MarkOffset(msg, "handled") // mark message as processed
                                if err := p.consumer.CommitOffsets(); err != nil {
                                    log.Errorf("%s consumer commit offsets Error: %s, topic:%s, partition:%d, offset:%d", prefix, err.Error(), msg.Topic, msg.Partition, msg.Offset)
                                } else {
                                    log.Infof("%s consumer.CommitOffsets success. topic:%s, partition:%d, offset:%d", prefix, msg.Topic, msg.Partition, msg.Offset)
                                }
                            } else {
                                log.Errorf("%s handler message(topic:%s, partition:%d, offset:%d, key:%s, val:%s) err:%s", prefix, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value, err.Error())
                            }
                        } else {
                            log.Infof("%s message handle is nil, so message is ignored.", prefix)
                            p.consumer.MarkOffset(msg, "handled") // mark message as processed
                            if err := p.consumer.CommitOffsets(); err != nil {
                                log.Errorf("%s consumer commit offsets Error: %s, topic:%s, partition:%d, offset:%d", prefix, err.Error(), msg.Topic, msg.Partition, msg.Offset)
                            } else {
                                log.Infof("%s consumer.CommitOffsets success. topic:%s, partition:%d, offset:%d", prefix, msg.Topic, msg.Partition, msg.Offset)
                            }
                        }
                    }
                }(part)
            }
        default:
            time.Sleep(time.Millisecond * 50)
        }

    }
}

from log file, it indicates that offset does commit successfully, but after a day or few days later, i receive the same message that has been handled successfully days ago.

carl-leopard commented 5 years ago

i have set the auto-commit-interval to a very large number that you can think it will never comes to

carl-leopard commented 5 years ago

image

dim commented 5 years ago

@carl-leopard sorry, but please note https://github.com/bsm/sarama-cluster#deprecation-notice

carl-leopard commented 5 years ago

thanks