segmentio / kafka-go

Kafka library in Go
MIT License
7.51k stars 779 forks source link

WriteMessages "Unexpected EOF" , config with sasl-plain #795

Closed tong3jie closed 2 years ago

tong3jie commented 2 years ago

Describe the bug when writer message to kafka server , it report that "Unexpected EOF"

both of report are about io or net

I can't solve it


Kafka Version kafka server : 0.10.22 kafka-go : 0.4.23


To Reproduce

package utils

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "os"
    "path"
    "strings"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

type Client struct {
    r      *kafka.Reader
    w      *kafka.Writer
    ctx    context.Context
    cancel context.CancelFunc
}

type Kafka struct {
    Clients  map[string]*Client
    Topics   map[string]struct{}
    CallBack func(message chan []byte)
    Lock     sync.Mutex
}

func NewKafkaDialer() *kafka.Dialer {
    conf := Conf
    dialer := &kafka.Dialer{}
    dialer.ClientID = "clientId"
    pwd, _ := os.Getwd()
    certBytes, err := ioutil.ReadFile(path.Join(pwd, "/utils/ca.cert"))
    if err != nil {
        Logger.Error("kafka client read cert file failed : " + err.Error())
    }
    clientCertPool := x509.NewCertPool()
    ok := clientCertPool.AppendCertsFromPEM(certBytes)
    if !ok {
        Logger.Error("kafka client failed to parse root certificate")
    }
    dialer.TLS = &tls.Config{
        RootCAs:            clientCertPool,
        InsecureSkipVerify: true,
    }
    dialer.SASLMechanism = plain.Mechanism{
        Username: conf.Kafka.UserName,
        Password: conf.Kafka.Password,
    }
    dialer.Timeout = time.Second * 10
    dialer.DualStack = true
    return dialer
}

func NewKafka(topics []string, callback func(message chan []byte)) *Kafka {
    conf := Conf
    brokers := strings.Split(conf.Kafka.Broker, ",")
    kafkaClients := make(map[string]*Client, len(topics))
    dialer := NewKafkaDialer()

    conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
    if err != nil {
        Logger.Error("kafka dial failed : " + err.Error())
    }
    partitions, err := conn.ReadPartitions()
    if err != nil {
        Logger.Error("kafka read partitions failed : " + err.Error())
    }
    Topics := make(map[string]struct{}, len(partitions))

    for _, partition := range partitions {
        Topics[partition.Topic] = struct{}{}
    }

    for _, topic := range topics {
        if _, ok := Topics[topic]; !ok {
            return nil
        }
        r := kafka.NewReader(kafka.ReaderConfig{
            Brokers:        brokers,
            Topic:          topic,
            GroupID:        "server",
            MinBytes:       10e2,
            MaxBytes:       10e5,
            Dialer:         dialer,
            CommitInterval: 1 * time.Second,
        })

        w := kafka.NewWriter(kafka.WriterConfig{
            Brokers:  brokers,
            Topic:    topic,
            Balancer: &kafka.Hash{},
            Dialer:   dialer,
        })
        ctx, cancel := context.WithCancel(context.Background())
        kafkaClients[topic] = &Client{
            r:      r,
            w:      w,
            ctx:    ctx,
            cancel: cancel,
        }
    }
    return &Kafka{
        Clients:  kafkaClients,
        Topics:   Topics,
        CallBack: callback,
        Lock:     sync.Mutex{},
    }
}

func (k *Kafka) Pub(topic string, msg []byte) {
    if _, ok := k.Topics[topic]; !ok {
        return
    }

    err := k.Clients[topic].w.WriteMessages(k.Clients[topic].ctx, kafka.Message{
        Value: msg,
    })
    if err != nil {
        Logger.Error("kafka publish failed : " + err.Error())
    }
}

func (k *Kafka) Sub(topics []string) {
    for _, t := range topics {
        if _, ok := k.Topics[t]; !ok {
            return
        }
    }

    ch := make(chan []byte, 10000)
    go func(ch chan []byte) {
        for _, t := range topics {
            for {
                m, err := k.Clients[t].r.ReadMessage(k.Clients[t].ctx)
                if err != nil {
                    Logger.Error("kafka subscribe failed : " + err.Error())
                    return
                }
                ch <- m.Value

            }

        }
    }(ch)
    go k.CallBack(ch)
}

func (k *Kafka) Pause(topic string) {
    defer k.Lock.Unlock()
    k.Lock.Lock()
    if _, ok := k.Topics[topic]; !ok {
        return
    }
    k.Clients[topic].cancel()
    delete(k.Clients, topic)
    time.Sleep(time.Second * 5)
    k.Resume(topic)
}

func (k *Kafka) Resume(topic string) {
    if _, ok := k.Topics[topic]; !ok {
        return
    }
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  strings.Split(Conf.Kafka.Broker, ","),
        Topic:    topic,
        GroupID:  "server",
        MinBytes: 10e2,
        MaxBytes: 10e5,

        Dialer: NewKafkaDialer(),
    })
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers:  strings.Split(Conf.Kafka.Broker, ","),
        Topic:    topic,
        Balancer: &kafka.Hash{},
        Dialer:   NewKafkaDialer(),
    })
    ctx, cancel := context.WithCancel(context.Background())
    k.Clients[topic] = &Client{
        r:      r,
        w:      w,
        ctx:    ctx,
        cancel: cancel,
    }
}

func (k *Kafka) Close() {
    for _, c := range k.Clients {
        c.cancel()
        if err := c.r.Close(); err != nil {
            Logger.Error("failed to close reader:" + err.Error())
        }
    }
}

Expected behavior


Additional context because kafka is not priority queue , so I must pause some low priority topics and resume it after little time .

rhansen2 commented 2 years ago

@lxxwilliam @tong3jie I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?

Thanks!

Jrmy2402 commented 2 years ago

It's working for me with your version! Thanks a lot :)

rhansen2 commented 2 years ago

Fixed via #869

lxxwilliam commented 2 years ago

@lxxwilliam @tong3jie I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?

Thanks!

I met another problems when I used v0.4.31, the writer always send an error named "read tcp xxx.xxx.xxx.xxx:xxxx->xxx.xxx.xxx.xxx:xxxx: i/o timeout", but when I used the old version like v0.3.7, the writer can work. please check it , Thanks a lot :)

rhansen2 commented 2 years ago

@lxxwilliam Please open a new issue and provide a runnable reproduction case.

Thanks!