Closed yimuczhou closed 1 year ago
`func registerKafkaSubscribers(ctx context.Context, srv kafka.Server, svc service.CanalService) { registerInsertSensorDataHandler(ctx, srv, svc) }
type HygrothermographHandler func(_ context.Context, topic string, headers broker.Headers, msg *v1.CanalData) error
func handleHygrothermograph(_ context.Context, topic string, headers broker.Headers, msg *v1.CanalData) error { log.Infof("Topic %s, Headers: %+v, Payload: %+v\n", topic, headers, msg) return nil }
func RegisterHygrothermographJsonHandler(fnc HygrothermographHandler) broker.Handler { return func(ctx context.Context, event broker.Event) error { var msg v1.CanalData switch t := event.Message().Body.(type) { case []byte: msg = &v1.CanalData{} if err := json.Unmarshal(t, msg); err != nil { return err } fmt.Println(msg.String()) case v1.CanalData: if err := fnc(ctx, event.Topic(), event.Message().Headers, t); err != nil { return err } default: return fmt.Errorf("unsupported type: %T", t) } return nil } }
func registerInsertSensorDataHandler(ctx context.Context, srv kafka.Server, svc service.CanalService) { //binder := func() broker.Any { return &v1.CanalData{} }
_ = srv.RegisterSubscriber(ctx,
"php-device-beta",
"kratos",
false,
RegisterHygrothermographJsonHandler(handleHygrothermograph),
nil,
)
}`
帮binder注释掉还是会报
你得去看库里边的示例。
是我发送的消息不对吗
大佬你看下是不是,我替换下个引入就可以了
应该是你发送的时候格式不对吧。Offset Explorer里边添加新消息要用:Add Multiple Messages。选择Value。
没错,就是这样的,我都晕死了。居然会搞错了。 我修改了一下,你看看了: github.com/tx7do/kratos-cqrs
没错,就是这样的,我都晕死了。居然会搞错了。 我修改了一下,你看看了: github.com/tx7do/kratos-cqrs
刚本地试了下,可以了
这个太容易出错了,我还是修改一下接口比较好。。。。
按照Kraots示例跑了下,出现这个问题,一般是什么原因导致