knative-extensions / eventing-kafka

Kafka integrations with Knative Eventing.
Apache License 2.0
77 stars 83 forks source link

MTChannelBasedBroker - Messages sent to Incorrect Triggers #1379

Closed lamluongbinh closed 8 months ago

lamluongbinh commented 1 year ago

Describe the bug Some of my message are sent to wrong trigger.

Detail logs that I get from my Deadletter service:

Cloudevent is discarded due to maximum retry. Labels: command-executer, Message: {"id":"5ab587a9-6ec3-478b-8bac-5ed754b11466","time":"2023-08-21T09:06:07.703Z","type":"com.messaging.challenge.progress.status.updated","source":"challenge-progress-calculator-worker","specversion":"1.0","datacontenttype":"application/json; charset=utf-8","data":{message}","knativearrivaltime":"2023-08-21T09:06:08.63480213Z","knativebrokerttl":"255","knativeerrorcode":"400","knativeerrordata":"","knativeerrordest":"http://broker-filter.knative-eventing.svc.cluster.local/triggers/production/command-executer/d30776b8-0562-4357-b773-9ab7e5ab1dcd","partitionkey":"com.messaging.challenge.progress.status.updated.64c777016969e72de8fbc012"}

My current trigger:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: challenge-worker-progress-status-updated
  annotations:
    kafka.eventing.knative.dev/delivery.order: ordered
spec:
  broker: default
  filter:
    attributes:
      type: com.messaging.challenge.progress.status.updated
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: challenge-worker
    uri: /cloudevents
  delivery:
    retry: 20
    deadLetterSink:
      ref:
        apiVersion: v1
        kind: Service
        name: challenge-worker
      uri: /cloudevents/deadletters
    backoffPolicy: "linear"
    backoffDelay: "PT1S"

Wrong destination trigger:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: command-executer
spec:
  broker: default-20220303
  filter:
    attributes:
      type: com.svc.commands.upsert
      dedicatedgroup: "none"
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: command-executor
    uri: /cloudevents/upsert
  delivery:
    retry: 600
    backoffPolicy: "linear"
    backoffDelay: "PT2S"
    deadLetterSink:
      ref:
        apiVersion: v1
        kind: Service
        name: command-executor
      uri: /cloudevents/deadletters?labels[]=command-executer

Dispatcher Handler: https://github.com/knative-extensions/eventing-kafka/blob/cbcf8a7c8112ec91956324b12a587945d2329041/pkg/channel/distributed/dispatcher/dispatcher/handler.go#L66-L95

Message Dispatcher: https://github.com/knative/eventing/blob/c3758271fde327f08735554cd64ff95b57cddde8/pkg/channel/message_dispatcher.go#L98-L190

Some error that I found in broker-filter

{"level":"error","ts":"2023-08-20T19:07:51.539Z","logger":"mt_broker_filter","caller":"filter/filter_handler.go:216","msg":"failed to send event","commit":"c375827","error":"failed to dispatch message: Post \"http://challenge-worker.production.svc.cluster.local/cloudevents/avro\": dial tcp 172.20.1.65:80: connect: connection refused","stacktrace":"knative.dev/eventing/pkg/broker/filter.(*Handler).send\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:216\nknative.dev/eventing/pkg/broker/filter.(*Handler).ServeHTTP\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:209\ngo.opencensus.io/plugin/ochttp.(*Handler).ServeHTTP\n\tgo.opencensus.io@v0.23.0/plugin/ochttp/server.go:92\nknative.dev/pkg/network/handlers.(*Drainer).ServeHTTP\n\tknative.dev/pkg@v0.0.0-20221011175852-714b7630a836/network/handlers/drain.go:113\nnet/http.serverHandler.ServeHTTP\n\tnet/http/server.go:2947\nnet/http.(*conn).serve\n\tnet/http/server.go:1991"}
{"level":"error","ts":"2023-08-20T19:22:35.698Z","logger":"mt_broker_filter","caller":"filter/filter_handler.go:216","msg":"failed to send event","commit":"c375827","error":"failed to dispatch message: Post \"http://challenge-worker.production.svc.cluster.local/cloudevents\": read tcp 10.103.174.167:60392->172.20.1.65:80: read: connection reset by peer","stacktrace":"knative.dev/eventing/pkg/broker/filter.(*Handler).send\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:216\nknative.dev/eventing/pkg/broker/filter.(*Handler).ServeHTTP\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:209\ngo.opencensus.io/plugin/ochttp.(*Handler).ServeHTTP\n\tgo.opencensus.io@v0.23.0/plugin/ochttp/server.go:92\nknative.dev/pkg/network/handlers.(*Drainer).ServeHTTP\n\tknative.dev/pkg@v0.0.0-20221011175852-714b7630a836/network/handlers/drain.go:113\nnet/http.serverHandler.ServeHTTP\n\tnet/http/server.go:2947\nnet/http.(*conn).serve\n\tnet/http/server.go:1991"}
{"level":"error","ts":"2023-08-22T04:32:40.817Z","logger":"mt_broker_filter","caller":"filter/filter_handler.go:216","msg":"failed to send event","commit":"c375827","error":"failed to dispatch message: Post \"http://command-executor.productiond.svc.cluster.local/cloudevents/upsert\": EOF","stacktrace":"knative.dev/eventing/pkg/broker/filter.(*Handler).send\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:216\nknative.dev/eventing/pkg/broker/filter.(*Handler).ServeHTTP\n\tknative.dev/eventing/pkg/broker/filter/filter_handler.go:209\ngo.opencensus.io/plugin/ochttp.(*Handler).ServeHTTP\n\tgo.opencensus.io@v0.23.0/plugin/ochttp/server.go:92\nknative.dev/pkg/network/handlers.(*Drainer).ServeHTTP\n\tknative.dev/pkg@v0.0.0-20221011175852-714b7630a836/network/handlers/drain.go:113\nnet/http.serverHandler.ServeHTTP\n\tnet/http/server.go:2947\nnet/http.(*conn).serve\n\tnet/http/server.go:1991"}

Expected behavior Message sent to the correct trigger

To Reproduce Not know how to reproduce it

Knative release version I am using MTChannelBasedBroker with knative-eventing. My current knativeeventing version: v1.8.6 My current eventing-kafka version: v1.8.1

github-actions[bot] commented 9 months ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.