tx7do / kratos-transport

kratos transport layer extension
MIT License
340 stars 92 forks source link

broker/kafka v1.2.6 以后 kafka consumer 不消费数据了 #89

Closed dgqypl closed 3 months ago

dgqypl commented 3 months ago

debug 了很久,发现是 https://github.com/tx7do/kratos-transport/commit/c1beaf697bba1a23eee824c8aa4cf3aeb5089d1e 这次 commit 导致的。

这次 commit 加了 610 ~ 612 这三行代码: https://github.com/tx7do/kratos-transport/blob/c1beaf697bba1a23eee824c8aa4cf3aeb5089d1e/broker/kafka/kafka.go#L605-L617

导致如果 options.Context 是 Background(),那么程序会阻塞在第 610 行,也永远不会执行到第 613 行的 sub.reader.FetchMessage(options.Context),因而不能消费数据。

在 607 ~ 608 这两行其实已经有处理 options.Context.Done() 这种情况的 case 分支了,不太明白当初这次 commit 是出于什么目的。

dgqypl commented 3 months ago

@haifeiWu

dgqypl commented 3 months ago

Subscribe 方法一开始就为 options.Context 赋了 context.Background(): https://github.com/tx7do/kratos-transport/blob/c1beaf697bba1a23eee824c8aa4cf3aeb5089d1e/broker/kafka/kafka.go#L576-L581 所以如果方法的最后一个参数 opts 中没有包含额外的 Context,那么程序执行到第 610 行用的就是方法一开始创建出的这个 context.Background(),也就会导致程序永久阻塞在 610 行

tx7do commented 3 months ago

这。。。。。