confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.66k stars 659 forks source link

consumer error with version 2.6.0 #1330

Open ucanme opened 1 week ago

ucanme commented 1 week ago

Description

when i consume but error return below: sasl_plaintext://10.132.117.36:39094/4: Disconnected (after 66ms in state UP, 3 identical error(s) suppressed) %6|1730885769.578|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://10.132.117.46:39094/1]: sasl_plaintext://10.132.117.46:39094/1: Disconnected (after 122ms in state UP, 4 identical error(s) suppressed)

when i change version to 2.4.0, the error disappears.

How to reproduce

here is my code

package kafka

import (
    "context"
    "logger"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "os"
    "os/signal"
    "strings"
    "syscall"
)

type ConsumerConfig struct {
    Addr              []string
    Topic             []string
    GroupId           string
    Version           string
    User              string
    Password          string
    OffsetReset       string
    MaxPollIntervalMs int
    SessionTimeOut    int
}

type MsgMeta struct {
    Patition int32
    Topic    string
    Offset   int64
}

type QueueConsumer struct {
    ready chan bool
}

func GroupConsumer(ctx context.Context, kafkaConf ConsumerConfig, consumeFunc func(ctx context.Context, ev kafka.Event)) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":     strings.Join(kafkaConf.Addr, ","),
        "broker.address.family": "v4",
        "group.id":              kafkaConf.GroupId,
        "session.timeout.ms":    kafkaConf.SessionTimeOut,
        "auto.offset.reset":     kafkaConf.OffsetReset,
        "sasl.mechanisms":       "PLAIN",
        "security.protocol":     "SASL_PLAINTEXT",
        //"max.poll.records":      1,
        "enable.auto.commit":      true,
        "auto.commit.interval.ms": 60000,
        "max.poll.interval.ms":    kafkaConf.MaxPollIntervalMs,
        "sasl.username":           kafkaConf.User,
        "sasl.password":           kafkaConf.Password,
    })

    if err != nil {
        logger.Info(ctx, "Error creating consumer group client:%v", err)
        panic(err)
    }
    logger.Info(ctx, "kafka connect success topic :%v", kafkaConf.Topic)

    err = c.SubscribeTopics(
        kafkaConf.Topic,
        func(consumer *kafka.Consumer, event kafka.Event) error {
            logger.Info(ctx, "kafka event : %s", event.String())
            return nil
        },
    )

    if err != nil {
        logger.Panic(ctx, "Error creating subscibe topic fileUploadNotifyTopic :%v", err)
    }

    run := true
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    for run {
        select {
        case sig := <-sigchan:
            logger.Error(ctx, "Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(1000)
            if ev == nil {
                continue
            }
            processCtx := logger.CtxWithRequestID(context.Background())
            consumeFunc(processCtx, ev)
        }
    }

    logger.Info(ctx, "Closing consumer")
    c.Close()
}

here is my case

package kafka

import (
    "context"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "testing"
)

func TestMain(m *testing.M) {
    m.Run()
}

func TestConsumer(t *testing.T) {
    cfg := ConsumerConfig{
        Addr:              []string{"xxxxxx"},
        Topic:             []string{"xxxx"},
        GroupId:           "hello",
        User:              "xxxx",
        Password:          "xxxx",
        OffsetReset:       "earliest",
        MaxPollIntervalMs: 10000,
        SessionTimeOut:    6000,
    }
    ctx := context.Background()
    GroupConsumer(ctx, cfg, func(ctx context.Context, ev kafka.Event) {
        fmt.Println("---ev----", ev.String())
        return
    })
}

Checklist

Please provide the following information:

milindl commented 2 days ago

Hello @ucanme , could you try this with debug logs and attach them here?

Just add "debug": "all" to your config to obtain the debug logs. What is the broker version that you are connecting to?