ego-component / ekafka

ekafka,kafka,ego
MIT License
7 stars 5 forks source link

当kafka服务断掉再启动后,ekafka无法自动连接 #19

Open songkaiha opened 4 months ago

songkaiha commented 4 months ago

我看ekafka的底层用的是kafka-go,单独使用kafka-go是支持kafka断掉再启动后,服务能够自动连接kafka的。

但是ekafka封装以后,服务无法重连kafka

askuy commented 4 months ago

我看ekafka的底层用的是kafka-go,单独使用kafka-go是支持kafka断掉再启动后,服务能够自动连接kafka的。

但是ekafka封装以后,服务无法重连kafka

是用的kafka-go测试的kafka端掉后在启动。我看下kafka-go的版本release信息

songkaiha commented 4 months ago

我看ekafka的底层用的是kafka-go,单独使用kafka-go是支持kafka断掉再启动后,服务能够自动连接kafka的。 但是ekafka封装以后,服务无法重连kafka

是用的kafka-go测试的kafka端掉后在启动。我看下kafka-go的版本release信息

您这可以本地实验一下,本地起一个kafka。ego服务起来后,再重启kafka,看服务能否自动重连

jackcipher commented 4 months ago

重连效果

image image image

Kafka 配置

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-net
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "baeldung:1:1"
    networks:
      - kafka-net
networks:
  kafka-net:
    driver: bridge

示例代码

package main

import (
    "context"
    "fmt"
    "log"
    "strings"

    "github.com/BurntSushi/toml"
    "github.com/gotomicro/ego/core/econf"
    "github.com/segmentio/kafka-go"

    "github.com/ego-component/ekafka"
)

// produce 生产消息
func produce(w *ekafka.Producer) {
    // 生产3条消息
    err := w.WriteMessages(context.Background(),
        &ekafka.Message{Key: []byte("Key-A"), Value: []byte("Hello World!")},
        &ekafka.Message{Key: []byte("Key-B"), Value: []byte("One!")},
        &ekafka.Message{Key: []byte("Key-C"), Value: []byte("Two!")},
    )
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }
    if err := w.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }
    fmt.Println(`produce message succ--------------->`)
}

// consume 使用consumer/consumerGroup消费消息
func consume(r *ekafka.Consumer) {
    ctx := context.Background()
    for {
        // ReadMessage 再收到下一个Message时,会阻塞
        msg, _, err := r.ReadMessage(ctx)
        if err != nil {
            fmt.Println("err", err)
            continue
            //panic("could not read message " + err.Error())
        }
        // 打印消息
        fmt.Println("received: ", string(msg.Value))
        err = r.CommitMessages(ctx, &msg)
        if err != nil {
            log.Printf("fail to commit msg:%v", err)
        }
    }
}

func main() {
    var stopCh = make(chan bool)
    // 假设你配置的toml如下所示
    conf := `
[kafka]
    debug=true
    brokers=["localhost:9093"]
    [kafka.client]
        timeout="3s"
    [kafka.producers.p1]        # 定义了名字为p1的producer
        topic="sre-infra-test"  # 指定生产消息的topic
        balancer="my-balancer"  # 指定balancer,此balancer非默认balancer,需要使用ekafka.WithRegisterBalancer()注册
    [kafka.consumers.c1]        # 定义了名字为c1的consumer
        topic="sre-infra-test"  # 指定消费的topic
        groupID="group-1"       # 如果配置了groupID,将初始化为consumerGroup   

`
    // 加载配置文件
    err := econf.LoadFromReader(strings.NewReader(conf), toml.Unmarshal)
    if err != nil {
        panic("LoadFromReader fail," + err.Error())
    }

    // 初始化ekafka组件
    cmp := ekafka.Load("kafka").Build(
        // 注册名为my-balancer的自定义balancer
        ekafka.WithRegisterBalancer("my-balancer", &kafka.Hash{}),
    )

    // 使用p1生产者生产消息
    go produce(cmp.Producer("p1"))

    // 使用c1消费者消费消息
    consume(cmp.Consumer("c1"))

    stopCh <- true
}

kafka-go 重连流程

  1. kafka-go 在 NewConsumerGroup 会开启一个 goroutine:

    go func() {
     cg.run()
    cg.wg.Done()
    }()
  2. cg.run() 中有一个 for 循环,执行 nextGeneration

    for {
    memberID, err = cg.nextGeneration(memberID)
    ...
  3. nextGeneration 中调用 coordinator

    func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
    // get a new connection to the coordinator on each loop.  the previous
    // generation could have exited due to losing the connection, so this
    // ensures that we always have a clean starting point.  it means we will
    // re-connect in certain cases, but that shouldn't be an issue given that
    // rebalances are relatively infrequent under normal operating
    // conditions.
    conn, err := cg.coordinator()
  4. coordinator 执行回调函数 connect

  5. connect 在 Validate 中初始化

    
    // Validate method validates ConsumerGroupConfig properties and sets relevant
    // defaults.
    func (config *ConsumerGroupConfig) Validate() error {
    ...
    if config.connect == nil {
        config.connect = makeConnect(*config)
    }
    
    return nil
    }
  6. 执行 makeConnect

    // connect returns a connection to ANY broker.
    func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
    return func(dialer *Dialer, brokers ...string) (coordinator, error) {
        var err error
        for _, broker := range brokers {
            var conn *Conn
            if conn, err = dialer.Dial("tcp", broker); err == nil {
                return &timeoutCoordinator{
                    conn:             conn,
                    timeout:          config.Timeout,
                    sessionTimeout:   config.SessionTimeout,
                    rebalanceTimeout: config.RebalanceTimeout,
                }, nil
            }
        }
        return nil, err // err will be non-nil
    }
    }
  7. 执行 Dial --> DialContext --> d.connect

    func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) {
    return d.connect(
        ctx,
        network,
        address,
        ConnConfig{
            ClientID:        d.ClientID,
            TransactionalID: d.TransactionalID,
        },
    )
    }

也就是说,消费组在启动的时候,就会有 goroutine 去 re-connect.

具体重练的时机在 makeConnect 中,重连的配置为 timeoutCoordinator, 默认 5s 重连一次

默认重连配置见 https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L63

songkaiha commented 4 months ago

@jackcipher 我原本用的是ConsumerGroup的方式,这种方式不能重连。改成Consumer的方式(也就是你示例代码这种)后,可以重连。这两种方式区别就是一个是对于kafka-go的套用?另一个是进行了其他改造?如果不影响什么,我暂时先用Consumer方式吧。

jackcipher commented 4 months ago

@songkaiha 你好!我们的 example 代码中有个 bug,里面的 consumptionErrors 是一个无缓冲 channel。在出现消费失败时,会往该 channel 写入 err。由于无缓冲 channel 的特性,只有当读写都准备就绪时,才能正常工作;当一方未就绪时,则会出现阻塞。这使得 Kafka 重启时,该 channel 一直在等待,阻塞了后续流程。

该 API 需要调用方正确处理 channel,这也增加了复杂度。因为这个方法被标记为 Deprecated,建议使用新的 API: OnConsumeEachMessage。可以参考以下链接: https://github.com/ego-component/ekafka/blob/master/consumerserver/component.go#L125

相关 PR: Pull Request #20