IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.59k stars 1.76k forks source link

Azure Event Hubs: Broker not connected error when using alias for disaster recovery #2223

Closed sgallizia closed 1 year ago

sgallizia commented 2 years ago
Versions
Sarama Kafka Go
1.32.0 Azure Event Hubs 1.18
Configuration

What configuration values are you using for Sarama and Kafka?

prdCfg.Version = sarama.V1_0_2_0 //I have also tried with V1_0_0_0
prdCfg.Producer.Return.Successes = true
cnsCfg.Version = sarama.V1_0_2_0
if config.Cfg.SaramaLog == "y" {
sarama.Logger, _ = zap.NewStdLogAt(logging.Get().Parent.With(zap.String("name", "sarama")), zapcore.DebugLevel)
}
cnsCfg.Consumer.Return.Errors = true
if strings.Contains(config.Cfg.OutputBinderBroker, "localhost") {
return
}
prdCfg.Net.SASL.Enable = true
prdCfg.Net.DialTimeout = 10 * time.Second
prdCfg.Net.SASL.User = config.Cfg.OutputBinderUser
prdCfg.Net.SASL.Password = config.Cfg.OutputBinderPassword
prdCfg.Net.SASL.Mechanism = "PLAIN"
prdCfg.Net.TLS.Enable = true
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
ClientAuth:         0,
}
prdCfg.Net.TLS.Config = tlsConfig

cnsCfg.Net.SASL.Enable = true
cnsCfg.Net.DialTimeout = 10 * time.Second
cnsCfg.Net.SASL.User = config.Cfg.InputBinderUser
cnsCfg.Net.SASL.Password = config.Cfg.InputBinderPassword
cnsCfg.Net.SASL.Mechanism = "PLAIN"
cnsCfg.Net.TLS.Enable = true
cnsCfg.Net.TLS.Config = tlsConfig
Logs
logs: CLICK ME

``` {"log.level":"debug","@timestamp":"2022-04-28T14:41:30.585Z","log.origin":{"file.name":"sarama@v1.32.0/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/1","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-we-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.588Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":959},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"sarama@v1.32.0/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/0","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"sarama@v1.32.0/utils.go","file.line":43},"message":"consumer/broker/0 disconnecting due to error processing FetchRequest: kafka: broker not connected","name":"sarama","ecs.version":"1.6.0"} {"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"} {"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.403Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.404Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-geo-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.402Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":1072},"message":"client/coordinator coordinator for consumergroup reset-password-dequeuer is #0 (st-evnt-bked-cana-geo-01.servicebus.windows.net:9093)","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":596},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"sarama@v1.32.0/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.308Z","log.origin":{"file.name":"sarama@v1.32.0/client.go","file.line":1050},"message":"client/coordinator requesting coordinator for consumergroup reset-password-dequeuer from st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"} {"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"} {"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.623Z","log.origin":{"file.name":"kafka/producer.go","file.line":34},"message":"message sent","partition":0,"offset":14,"ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"sarama@v1.32.0/utils.go","file.line":43},"message":"producer/broker/0 starting up","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.586Z","log.origin":{"file.name":"sarama@v1.32.0/utils.go","file.line":43},"message":"producer/broker/0 state change to [open] on reset-password/0","name":"sarama","ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":40},"message":"Message processing","topic":"reset-password","partition":0,"offset":15,"ecs.version":"1.6.0"} {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":41},"message":"Message content","content":"{\r\n \"Event\": {\r\n \"id\": \"0123456789\",\r\n \"type\": \"AbilitazioniServiziTelematici\",\r\n \"notification_time\": 1634131014999\r\n },\r\n \"Action\": {\r\n \"type\": \"Update\",\r\n \"subtype\": \"Reset_password\"\r\n },\r\n \"Element\": {\r\n \"client_code\": \"0056999\"\r\n },\r\n \"Data\": {\r\n \"fiscal_code\": \"TSTMJR57C1xxxxxx\"\r\n }\r\n}","ecs.version":"1.6.0"} ```

Problem Description

We have a service written in go which connects to azure event hub through the sarama library: it reads a message from a queue and it publishes it into another. We receives an error from the sarama consumer every time the service reads a message from the queue:

"kafka: error while consuming reset-password/1: kafka: broker not connected"

The error occurs when the service connects to event hub through the alias for disaster recovery (broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093). We have the alias st-evnt-bked-cana-geo-01 with the primary namespace st-evnt-bked-cana-we-01 and the secondary namespace st-evnt-bked-cana-ne-01. If we connect directly with the broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093, than the error doesn't occur. Although this error, the message is received and published on the output queue, but we are concerned about the impact on the performance, because we can see from the logs that sarama reconnects to another broker (st-evnt-bked-cana-geo-01 or st-evnt-bked-cana-we-01). We already opened a ticket on Azure, but for Microsoft everything is ok.

sgallizia commented 2 years ago

My colleague fixed the issue changing how the consumer group is created:

func newConsumerGroup(cnsCfg *sarama.Config) (sarama.ConsumerGroup, error) {
    cnsCfg.Metadata.RefreshFrequency = 0 * time.Second
    client, err := sarama.NewClient([]string{config.Value.Stream.Broker}, cnsCfg)
    if err != nil {
        return nil, err
    }
    broker, err := getBroker(client)
    if err != nil {
        return nil, err
    }
    splitPort := strings.Split(config.Value.Stream.Broker, ":")
    cnsCfg.Net.SASL.Password = strings.ReplaceAll(
        cnsCfg.Net.SASL.Password, splitPort[0], broker.Addr())
    cnsCfg.Metadata.RefreshFrequency = 10 * time.Minute
    client, err = sarama.NewClient([]string{broker.Addr()}, cnsCfg)
    if err != nil {
        return nil, err
    }
    group, err := sarama.NewConsumerGroupFromClient(config.Value.Stream.ConsumerGroup, client)
    if err != nil {
        return nil, err
    }
    return group, nil
}

func getBroker(client sarama.Client) (*sarama.Broker, error) {
    coordinator, err := client.Coordinator(config.Value.Stream.ConsumerGroup)
    if err != nil {
        return nil, err
    }
    err = client.RefreshMetadata()
    if err != nil {
        return nil, err
    }
    broker, err := client.Broker(coordinator.ID())
    if err != nil {
        return nil, err
    }
    err = client.Close()
    return broker, nil
}

I suspect that something is wrong on azure event hub side: I think that the kafka api wrapper on the event hub system sometimes has strange behavior.

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

Sounds like this issue was resolved 👍🏻