Closed jbkc85 closed 7 years ago
It is something you can do directly through the sarama API, please take a look at https://godoc.org/github.com/Shopify/sarama#Broker.GetAvailableOffsets and https://godoc.org/github.com/Shopify/sarama#Broker.FetchOffset. But, to be honest, you areprobably better off using the bundled kafka command line tools, please see my comment here: https://github.com/bsm/sarama-cluster/issues/141#issuecomment-311894704
@dim , thanks for the comments. I really wanted to avoid going through the command line tools in hopes of opening up access to expose metrics and to give individuals a CLI tool to dig into it from their laptops. I have, what I believe, is a working prototype, but for some reason one of the offsets passed back just doesn't ever update. Would you mind giving me a pointer, if possible? I have been working on it for about 2 days now with no luck...
for _, partition := range myPartitions {
var partitionBroker *sarama.Broker
partitionBroker, err = clientConn.Leader(topic, partition)
if err != nil {
log.Printf("[ERROR] Topic %s Partition %v doesn't seem to have a Leader (err: %s)", topic, partition, err)
}
defer partitionBroker.Close()
offsetFetchRequest := &sarama.OffsetRequest{}
offsetFetchRequest.AddBlock(topic, partition, time.Now().UnixNano(), sarama.MaxResponseSize)
offsets, _ := partitionBroker.GetAvailableOffsets(offsetFetchRequest)
for _, offsetValues := range offsets.Blocks {
for _, value := range offsetValues {
//"GROUP", "TOPIC:PARTITION#", "CURRENT_OFFSET", "LOG_END_OFFSET", "LAG"
if len(value.Offsets) == 0 {
table.AddRow("", fmt.Sprintf("%s:%v", topic, partition), "-", "-", "-")
} else if len(value.Offsets) == 1 {
table.AddRow("", fmt.Sprintf("%s:%v", topic, partition), value.Offsets[0], value.Offsets[0], 0)
} else {
table.AddRow("", fmt.Sprintf("%s:%v", topic, partition), value.Offsets[1], value.Offsets[0], value.Offsets[0]-value.Offsets[1])
}
}
}
}
I then tried FetchOffset using a specific ConsumerGroup, but continuously get a '-1' from the offset fetch. Am I doing something wrong with this code? Also, if you have time, is there any way you can explain to me what I'm doing and why I am misunderstanding this approach to Kafka via Sarama?
Thanks if you have the time, and thanks for your first comment as well =).
As @dim said, this is a general sarama kafka client question, not a sarama-cluster question.
Looking at your code the first issue I see is you're passing in nanosecond to kakfa. Kafka usually takes msec timestamps. You should look at the specification of the message you're sending to be certain what you need to send. kafka.apache.org has a nice page somewhere with all the messages details if you don't care to read the java source code.
Also there's a sarama API for what you're doing: sarama.Client.GetOffset(). You can read that source.
Lastly -1 is sarama.OffsetNewest. For a consumer group that means there is no offset stored (or it TTLed out) and the consumers would start at the next message published.
Why don't you take a look at https://github.com/fgeller/kt, it's a CLI tool written in Go (single bin) that supports consumer group offsets + shows lag
@dim thank you much! I will definitely take a look at this.
As for posting in this repo @nsd20463, I apologize...I was pointed to this repo via the Sarama and thought my questions might be irrelevant to them given what I was trying to do. So I greatly appreciate your comments and explanation of the code.
I should have all I need at this point, so once again thank you both for your time and knowledge!
To whom it may concern,
I apologize as I am attempting to look through the code and am unable to see where/how, if able, I could grab lag of a particular consumer group and topic push out to a simple fmt.println.
I am relatively new to Kafka, so please forgive my potential oversights =). I will continue to look to see if I can find it, so I apologize ahead of time if I do!
Thanks, Jason Cameron