confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 653 forks source link

Can't subscribe to topics #154

Closed johnseekins closed 6 years ago

johnseekins commented 6 years ago

Description

When connecting to our SASL-enabled cluster, I am able to list topics successfully with the client, but I can't seem to subscribe.

That is, this code:

        var topics []string
        if len(config.Topics) < 1 {
                topics = []string{"^.*"}
        } else {
                topics = config.Topics
        }
        fmt.Printf("Attempting to subscribe to: %v\n", topics)
        c.SubscribeTopics(topics, nil)
        topics, _ = c.Subscription()
        fmt.Printf("Subscribed to: %v\n", topics)

Produces this output:

Attempting to subscribe to: [ops-primespeed_load_generator-devOps-logs]
Subscribed to: []

When topics = [ops-primespeed_load_generator-devOps-logs]

How to reproduce

Not sure...

Checklist

Please provide the following information:

Appologies if I'm providing the "ConfigMap" wrong. I'm still learning a bit with Go.

johnseekins commented 6 years ago

Additional Info:

kafkacfg := kafka.ConfigMap{"bootstrap.servers":               broker_list,
                            "socket.nagle.disable":            true,
                            "delivery.report.only.error":      true,
                            "go.events.channel.enable":        true,
                            "go.application.rebalance.enable": true,
                            "group.id":                        config.GroupID,
                            "session.timeout.ms":              6000,
                            "default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "earliest"}}
if config.CompressionType != "" {
        kafkacfg.SetKey("compression.codec", config.CompressionType)
}
if config.EnableSASL && len(config.KafkaSASLOpts) > 0 {
        for k, v := range config.KafkaSASLOpts {
                kafkacfg.SetKey(k, v)
        }
}
fmt.Printf("Config: %v\n" config)
c, err = kafka.NewConsumer(&kafkacfg)
if err != nil {
                fmt.Printf("Failed to create consumer: %s\n", err)
                panic(err)
} else {
                fmt.Printf("Created Consumer %v\n", c)
}

Produces: Created Consumer rdkafka#consumer-1

edenhill commented 6 years ago

I can't reproduce this:

Created Consumer rdkafka#consumer-1
Attempting to subscribe to: [^.*]
Subscribed to: [^.*]
johnseekins commented 6 years ago

That's good. I assumed it was something dumb I was doing.

However, regardless of the topic I attempt to subscribe to, I always get an empty list afterwards.

Is there anything that might increase my debug levels to see what poor choice I'm making?

edenhill commented 6 years ago

Start with checking the err return from Subscribe(), if there is no error there you can enable "debug": "cgrp" and look for hints.

johnseekins commented 6 years ago

Trying to collect the error from the Subscribe() call results in a hung process with this looping over and over again in strace:

[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] futex(0xcfb770, FUTEX_WAIT, 0, {60, 0} <unfinished ...>                                                                           
[pid 12887] <... restart_syscall resumed> ) = -1 ETIMEDOUT (Connection timed out)                                                             
[pid 12887] futex(0x2853ad0, FUTEX_WAKE_PRIVATE, 1) = 0                                                                                       
[pid 12887] futex(0x2853afc, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 93, {1520626917, 635341000}, ffffffff <unfinished ...>           
[pid 12884] <... futex resumed> )       = -1 ETIMEDOUT (Connection timed out)                                                                 
[pid 12884] futex(0x2853a10, FUTEX_WAKE_PRIVATE, 1) = 0                                                                                       
[pid 12884] futex(0xcfb770, FUTEX_WAKE, 1) = 1                                                                                                                 
[pid 12882] <... futex resumed> )       = 0                                                                                                         
[pid 12884] futex(0x2853a3c, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 861, {1520626916, 798343000}, ffffffff <unfinished ...>          
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                                                                                                                                                                   
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)                                                                     
[pid 12882] pselect6(0, NULL, NULL, NULL, {0, 20000}, NULL) = 0 (Timeout)
edenhill commented 6 years ago

I don't think it is necessary to debug this at the syscall level.

johnseekins commented 6 years ago

So after I remembered how code works, I actually got an error:

Attempting to subscribe to: [^.*logs]                                  
Error: Local: Unknown group  

(I had forgotten to actually call the subscribe function...)

johnseekins commented 6 years ago

Wow. I'm going to go ahead a close this ticket, and probably never bother you again.

I had forgotten to put in a value for my group.id field. Oddly enough, this failed in a cluster that has ACLs.

edenhill commented 6 years ago

Please do bother again, happy you got it sorted out!