birdayz / kaf

Modern CLI for Apache Kafka, written in Go.
Apache License 2.0
2.24k stars 143 forks source link

Consumer command panics sometimes with broker not connected #54

Closed hoenirvili closed 5 years ago

hoenirvili commented 5 years ago

I've imported my configuration from my ccloud using this command

kaf config import ccloud

Sometimes when I try and consume things I'm receving this panic. It's unpredictable and I don't know why.

kaf consume <topic_that_I_want_to_consume_from>
panic: kafka: broker not connected

goroutine 57 [running]:
main.glob..func7.1(0x7ffeefbff82c, 0x2b, 0x4642220, 0xc00011c120, 0xc000026f28, 0x463fa80, 0xc0003b57c0, 0xc000278f58, 0xc000278f60, 0x8)
    /Users/nn/Work/Go/src/github.com/infinimesh/kaf/cmd/kaf/consume.go:96 +0xf7c
created by main.glob..func7
    /Users/nn/Work/Go/src/github.com/infinimesh/kaf/cmd/kaf/consume.go:85 +0x2a4
➜  esp-iot/services/blob-pusher-service blob-pusher ✗ vim ../../../../Work/Go/src/github.com/infinimesh/kaf/cmd/kaf/consume.go

After I retry for a couple of times the previous command magically works and I see my topic data.

hoenirvili commented 5 years ago

On further inspection looks like the call on Leader(topic string, partitionID int32) (*Broker, error) in the sarama package returns a ptr to a Broker type and after, the code calls GetAvailableOffsets and fails because somehow we lost connection to the broker.

Seems to me that it's an issue in the sarama.

hoenirvili commented 5 years ago

After further investigation looks like the call to ldr.GetAvailableOffsets returns a bit to slow sometimes so If we add a error checking loop like in the following patch resolves the issue. I could send this as a pull request @birdayz if you want. But to be honest I don't know about this approach.

drwxr-xr-x   4 hoenir  staff   128B May  6 12:02 demo
diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go
index 09e7a28..a2661c9 100644
--- a/cmd/kaf/consume.go
+++ b/cmd/kaf/consume.go
@@ -91,9 +91,19 @@ var consumeCmd = &cobra.Command{
                                if err != nil {
                                        panic(err)
                                }
-                               offsets, err := ldr.GetAvailableOffsets(req)
-                               if err != nil {
-                                       panic(err)
+                               var offsets *sarama.OffsetResponse
+                               for {
+                                       var err error
+                                       offsets, err = ldr.GetAvailableOffsets(req)
+                                       if err != nil {
+                                               continue
+                                       } else {
+                                               break
+                                       }
                                }
                                followOffset := offsets.GetBlock(topic, partition).Offset - 1
birdayz commented 5 years ago

i think this is reasonable. you can call ldr.Connected() to check if it's connected. if you want, you can make a PR.

hoenirvili commented 5 years ago

I think now, this can be closed. Thanks again for the awesome project.