confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Always getting -1001 for topicPartition offset value #37

Closed jiminoc closed 7 years ago

jiminoc commented 7 years ago

I'm trying to work on offset commit management with this library. My hope is when I'm using the channel based consumer and I get a kafka.AssignedPartitions message (using the latest auto reset strategy) that the offset I get back for each topic/partition contains the starting offset that I was assigned to. What I'm seeing is that all partitions are returning -1001 as the offset value no matter what I do.

When I look at the output of burrow, it shows me that the consumer group is sending its commits in what appears to be a correct manner.

Any idea what might be going on? thanks!

LOG MESSAGE THAT PRINTS OUT

topic=testtopic part=0 offset=-1001 err=<nil> low=11502464 high=12057147
topic=testtopic part=1 offset=-1001 err=<nil> low=11535834 high=15125592
topic=testtopic part=2 offset=-1001 err=<nil> low=11450986 high=14338118
topic=testtopic part=3 offset=-1001 err=<nil> low=11315825 high=13661644

BURROW OUTPUT


  "error": false,
  "message": "consumer group status returned",
  "status": {
    "cluster": "testcluster",
    "group": "mytestclient",
    "status": "OK",
    "complete": false,
    "partitions": [
      {
        "topic": "testtopic",
        "partition": 0,
        "status": "OK",
        "start": {
          "offset": 13625698,
          "timestamp": 1485219538694,
          "lag": 0
        },
        "end": {
          "offset": 13639130,
          "timestamp": 1485220774613,
          "lag": 1662449
        }
      },
// Example channel-based high-level Apache Kafka consumer
package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "os/signal"
    "syscall"
)

func main() {

    broker := "127.0.0.1"
    group := "mytestclient"
    topics := []string{"testtopic"}

    sigchan := make(chan os.Signal)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":               broker,
        "group.id":                        group,
        "session.timeout.ms":              6000,
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "latest"}})

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v\n", c)

    err = c.SubscribeTopics(topics, nil)

    run := true

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false

        case ev := <-c.Events():
            switch e := ev.(type) {
            case kafka.AssignedPartitions:
                for _, tp := range e.Partitions {
                    low, high, _ := c.QueryWatermarkOffsets("testtopic", tp.Partition, 3000)
                    fmt.Printf("topic=%s part=%d offset=%d err=%v low=%d high=%d\n", *tp.Topic, tp.Partition, tp.Offset, tp.Error, low, high)

                }
                c.Assign(e.Partitions)
            case kafka.RevokedPartitions:
                fmt.Fprintf(os.Stderr, "%% %v\n", e)
                c.Unassign()
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n",
            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}
mhowlett commented 7 years ago

My understanding is that what you are seeing is by design - the AssignPartitions message effectively only tells you the topic/partition assignment (and nothing about offsets). I believe the reasoning behind including an offset value of -1001 alongside the topic/partition values in e.Partitions is that it allows you to pass e.Partitions directly to the Assign method (which requires that offsets are specified). An offset value of -1001 (invalid) is a special value which instructs the consumer to start reading from the last committed offset, or if there is no committed offset, this will be determined by the auto.offset.reset configuration parameter.

On Mon, Jan 23, 2017 at 5:49 PM, Jim Plush notifications@github.com wrote:

I'm trying to work on offset commit management with this library. My hope is when I'm using the channel based consumer and I get a kafka.AssignedPartitions message (using the latest auto reset strategy) that the offset I get back for each topic/partition contains the starting offset that I was assigned to. What I'm seeing is that all partitions are returning -1001 as the offset value no matter what I do.

When I look at the output of burrow, it shows me that the consumer group is sending its commits in what appears to be a correct manner.

Any idea what might be going on? thanks!

LOG MESSAGE THAT PRINTS OUT

topic=testtopic part=0 offset=-1001 err= low=11502464 high=12057147 topic=testtopic part=1 offset=-1001 err= low=11535834 high=15125592 topic=testtopic part=2 offset=-1001 err= low=11450986 high=14338118 topic=testtopic part=3 offset=-1001 err= low=11315825 high=13661644

BURROW OUTPUT

"error": false, "message": "consumer group status returned", "status": { "cluster": "testcluster", "group": "mytestclient", "status": "OK", "complete": false, "partitions": [ { "topic": "testtopic", "partition": 0, "status": "OK", "start": { "offset": 13625698, "timestamp": 1485219538694, "lag": 0 }, "end": { "offset": 13639130, "timestamp": 1485220774613, "lag": 1662449 } },

// Example channel-based high-level Apache Kafka consumer package main

import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" )

func main() {

broker := "127.0.0.1" group := "mytestclient" topics := []string{"testtopic"}

sigchan := make(chan os.Signal) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": group, "session.timeout.ms": 6000, "go.events.channel.enable": true, "go.application.rebalance.enable": true, "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "latest"}})

if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) }

fmt.Printf("Created Consumer %v\n", c)

err = c.SubscribeTopics(topics, nil)

run := true

for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false

  case ev := <-c.Events():
      switch e := ev.(type) {
      case kafka.AssignedPartitions:
          for _, tp := range e.Partitions {
              low, high, _ := c.QueryWatermarkOffsets("testtopic", tp.Partition, 3000)
              fmt.Printf("topic=%s part=%d offset=%d err=%v low=%d high=%d\n", *tp.Topic, tp.Partition, tp.Offset, tp.Error, low, high)

          }
          c.Assign(e.Partitions)
      case kafka.RevokedPartitions:
          fmt.Fprintf(os.Stderr, "%% %v\n", e)
          c.Unassign()
      case *kafka.Message:
          fmt.Printf("%% Message on %s:\n%s\n",
      case kafka.PartitionEOF:
          fmt.Printf("%% Reached %v\n", e)
      case kafka.Error:
          fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
          run = false
      }
  }

}

fmt.Printf("Closing consumer\n") c.Close() }

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/confluentinc/confluent-kafka-go/issues/37, or mute the thread https://github.com/notifications/unsubscribe-auth/AABK5pj4fUpHEfWq9V6wvwrhetn-bUjSks5rVVi2gaJpZM4LrxWU .

jiminoc commented 7 years ago

thanks Matt, let me paint the scenario we're trying to solve to see if you have any suggestions on how we might go about it.

Our goal is to ensure all messages are fully processed before committing our offset forward. At high volume, you don't want to commit on every message so we'd like to set a ticker so that, let's say, every 2 seconds we commit our highest processed offset. A potential problem with that approach is that there's no guarantee that you processed all the messages ahead of that unless you're tracking it from your starting offset which it doesn't seem we'll be able to get.

Let's say you have a new consumer start-up, it's set to latest offsets and the latest offset for that topic is 100. So it starts getting messages for topic/partition Topic1Partition0 with the message offsets 101,102, etc.. up to 110. We successfully process message 110 when the ticker goes off. We know that 110 is our high offset but we haven't gotten 101-109 so we can't move our pointer ahead to 110 yet, so we skip committing. The ticker goes off again in 2 seconds and by then we know we've done 101-110 so we can commit offset 110. We stop the consumer and restart it, we don't know that we're starting at 110 now with the AssignedPartitions call.

A workaround off the top of my head would be that we know partitions are ordered so we can use the first offset we get on startup for each topic/partition and that becomes our starting value to know when we have a contiguous range of offsets to commit safely.

Unless you know of another approach that might work better? thanks!

mhowlett commented 7 years ago

librdkafka has a method rd_kafka_committed (also very similar, rd_kafka_position) which will tell you the committed offsets for a list of topic/partitions, but I can't see that exposed in the go client.

What you suggest is sounds correct - the first message received for each topic/partition will give you the start offset - I think you should be able to do what you want with this pretty easily. @edenhill may have some more input when his timezone allows ...

ewencp commented 7 years ago

@jiminoc Are you doing async processing of messages? That's the only case you would need to do the type of tracking you're talking about. Many (most?) Kafka applications are written to process messages one at a time, serially. Having to interact with a slow, external system is usually where this assumption of serial processing can break down.

If you are processing asynchronously, you'll need to track when it is safe to commit offsets, but usually you could do that just by tracking the smallest unacked message. You don't usually need to know the starting offset before hand, you just poll() for the first message and its offset will be the first offset you worry about.

There's also a slightly different way to handle commits. Rather than committing safe offsets exactly ever 2 seconds, you can instead track unacked messages and every 2 seconds mark what is still outstanding, wait for it to flush, then commit the offset you were at when you initiated the process. This makes the accounting a little bit simpler (you just need a map to remove acked messages from, waiting until the map is empty to commit). However, to allow processing to continue while waiting for the outstanding data to flush (i.e. so you don't stall), you do need to have 2 lists of outstanding messages: 1 for ongoing processing and 1 to track outstanding messages since the last commit process was initiated. This process is how Kafka connect manages commits for source connectors. It avoids having to also maintain a data structure that can tell you efficiently what the min safe offset to commit is.