tx7do / kratos-transport

kratos transport layer extension
MIT License
348 stars 95 forks source link

异步写 kafka 设置回调函数没生效 #86

Closed dgqypl closed 5 months ago

dgqypl commented 5 months ago

根据 transport/kafka v1.2.10 的更新,在我的应用中初始化 broker 时传入了 Completion func:

    broker := kratosBrokerKafka.NewBroker(
        kratosBroker.WithAddress(c.Brokers...),
        kratosBroker.WithCodec("json"),
        kratosBrokerKafka.WithCompletion(f),
    )

但在程序启动发送消息后发现并没有运行回调函数。

看了一下更新的代码,发现有两处问题: 1、context 中放入的是 &completion,而取出时判断的是 func 本身,导致不会进到 if ok 逻辑里: https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/options.go#L264-L265 https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/kafka.go#L117-L119

2、即便把上一点的放入 context 改为 func 本身而不是指针,程序启动后会报 panic,原因是在初始化 b.writer 时并没有为 b.writer.Writer 赋值: https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/kafka.go#L115 https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/writer.go#L87-L92 因而在 if ok 逻辑里将 Completion 赋给 b.writer.Writer 时,此时 b.writer.Writernil 从而导致 panic


个人觉得是否可以考虑将 Completion 放到 kafkaBroker 的 writerConfig 中: https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/kafka.go#L31-L35

虽然在 kafka-go 中,Completion 只在 Writer 中而没在 WriterConfig 中,这样做显得与 kafka-go 有些不一致,但由于本项目创建 kafkaGo.Writer 是在 CreateProducer 方法中的: https://github.com/tx7do/kratos-transport/blob/f2a956a3fe4f73407df20e828c6ac73191de766f/broker/kafka/writer.go#L106-L131 而 CreateProducer 方法的入参只有三个,如果为 Completion 单独加个入参又显得突兀,所以感觉放到 writerConfig 中即使不是最合适的,目前也想不到有其他更合适的方式了。

dgqypl commented 5 months ago

关联 issue:https://github.com/tx7do/kratos-transport/issues/81

tx7do commented 5 months ago

增加了一个单元测试:Test_Publish_WithCompletion