childe / gohangout

使用 golang 模仿的 Logstash。用于消费 Kafka 数据,处理后写入 ES、Clickhouse 等。
MIT License
1.03k stars 238 forks source link

如果kafka中收到value是null的消息会关闭连接 #131

Closed kingwide closed 3 years ago

kingwide commented 3 years ago

发现一个现象如果kafka中有null的消息,gohangout会停止工作,代码input_box.go中有收到nil会关闭kafka的连接的逻辑,请问大佬这是出于什么考虑的呢 for box.stop { event = box.input.ReadOneEvent() if event == nil { if !box.stop { glog.Info("receive nil message. shutdown...") box.shutdown() } return } firstNode.Process(event)

childe commented 3 years ago

最开始的时候,这个方便测试,而且程序退出后会被k8s或者其他管理平台拉起来,所以没啥影响。 后面一同学加了config reload功能,但导致程序不会退出了,类似于僵死了,就有问题了。 我最近改了代码,修复了一下。一会合进来。

发自我的iPhone

------------------ 原始邮件 ------------------ 发件人: kingwide @.> 发送时间: 2021年4月7日 10:12 收件人: childe/gohangout @.> 抄送: Subscribed @.***> 主题: 回复:[childe/gohangout] 如果kafka中收到value是null的消息会关闭连接 (#131)

kingwide commented 3 years ago

如果删除这段逻辑会对其他功能有影响吗

childe commented 3 years ago

如果删除这段逻辑会对其他功能有影响吗

没有影响

childe commented 3 years ago

v1.7.0 加了一个参数,可以选择是退出,还是忽略。

WideLee commented 3 years ago

我倒觉得收到nil退出是一个挺合理的设计,不然input filter里边ReadOneEvent只有一个返回参数,就不能主动进行退出了,这里的场景大概是这样的

  1. 进程收到sigint或者sigterm,要执行清理操作
  2. 通知input shutdown,关闭与远端input(kafka等)的连接,但内存里边可能还有一些已经拉回来的数据没处理完
  3. 流水线仍在读取ReadOneEvent,将以及读回来的数据交给后续的filter与output处理完
  4. ReadOneEvent返回nil,告诉流水线已经发完消息了,流水线去shutdown output清理资源

一般遇到错误的时候,我觉得不要返回nil,而是进入下一个循环等待会更好,比如这样,或者ReadOneEvent加一个返回参数,isFinish bool,但这样改动有点大不好兼容

func (p *SomeInput) ReadOneEvent() map[string]interface{} {
    for {
        event, ok := func() (map[string]interface{}, bool) {
            cm, more := <-p.messages
            if !more {
                return nil, true
            }

            err := json.Unmarshal(msg.Payload(), &event)
            if err != nil {
                // 处理异常,输出日志             
                return nil, false
            }

            return event, true
        }()

        if ok {
            return event
        }
    }
}