knative-extensions / eventing-rabbitmq

RabbitMQ eventing components. Knative Source and Broker.
Apache License 2.0
91 stars 67 forks source link

Trigger's DLQ dispatcher acknowledges messages even if dead letter sink is not reachable #1385

Closed lfmatosm closed 2 months ago

lfmatosm commented 6 months ago

Describe the bug Hello! First of all, thanks for your job creating eventing-rabbitmq.

I was creating a simple producer-consumer example in Knative using RabbitMQ. I am using a trigger to deliver messages from a broker into my consumer service, and I also configured a dead-letter sink service to handle failures. However, if both services are unresponsive (e.g. if routes are offline or if services were undeployed), messages are acklowledged by the DLQ dispatcher instead of being kept in the DLQ.

Expected behavior If the sink address is not reachable, messages should remain in the dead-letter queue.

To Reproduce You need to define a trigger with both subscriber and delivery services configured. This example should suffice:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: consumer-trigger
  namespace: spike
spec:
  broker: rabbitmq-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: consumer
      namespace: spike
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: sink
        namespace: spike
    # 1s delay between retries
    backoffDelay: PT1S
    backoffPolicy: linear
    retry: 2

Now, you must make those services unreachable. You can mess up with the routes, or in a more drastic way, undeploy them. Naturally, messages will be delivered to the trigger's DLQ, and the dispatcher will try to deliver them to its sink. However, upon erroring, the DLQ dispatcher will simply acknowledge the message and log the error.

You can see the DLQ dispatcher logs using kubectl logs consumer-trigger-dlx-dispatcher-someidentifier -n spike and something like this will be shown:

{"level":"info","ts":"2024-04-18T13:20:52.637Z","logger":"rabbitmq-dispatcher","caller":"dispatcher/main.go:89","msg":"Setting BackoffDelay","commit":"33a01ed-dirty","backoffDelay":1}
{"level":"info","ts":"2024-04-18T13:20:52.800Z","logger":"rabbitmq-dispatcher","caller":"dispatcher/dispatcher.go:96","msg":"rabbitmq receiver started, exit with CTRL+C","commit":"33a01ed-dirty"}
{"level":"info","ts":"2024-04-18T13:20:52.800Z","logger":"rabbitmq-dispatcher","caller":"dispatcher/dispatcher.go:97","msg":"starting to process messages","commit":"33a01ed-dirty","queue":"t.spike.consumer-trigger.dlq.c82a02fa837b3152e139578d80336dc313","workers":50}
{"level":"warn","ts":"2024-04-18T13:33:22.143Z","logger":"rabbitmq-dispatcher","caller":"dispatcher/dispatcher.go:152","msg":"invalid result type, not HTTP Result: Post \"http://sink.spike.svc.cluster.local\": dial tcp: lookup sink.spike.svc.cluster.local on 10.96.0.10:53: no such host","commit":"33a01ed-dirty"}
{"level":"warn","ts":"2024-04-18T13:33:22.144Z","logger":"rabbitmq-dispatcher","caller":"dispatcher/dispatcher.go:288","msg":"failed to deliver to \"http://sink.spike.svc.cluster.local\" {%!s(*amqp091.Channel=&{{0 {0 0}} {0 0} {0 0} {{0 0} 0 0 {{} 0} {{} 0}} 0xc00021c000 0xc0006f6360 0xc0006f46c0 1 0 false [0xc0006f6540 0xc00010cd20] [] [] [] 0xc0006f21e0 true 0xc00013c7e0 0xe682c0 0xc000370120 0xc0000e21c0 [123 34 115 112 101 99 118 101 114 115 105 111 110 34 58 34 49 46 48 34 44 34 105 100 34 58 34 54 50 54 97 51 99 53 101 45 49 51 51 48 45 52 101 57 49 45 98 98 57 99 45 50 55 48 51 51 52 57 51 49 48 102 98 34 44 34 115 111 117 114 99 101 34 58 34 107 110 97 116 105 118 101 47 101 118 101 110 116 105 110 103 47 112 114 111 100 117 99 101 114 47 101 118 101 110 116 34 44 34 116 121 112 101 34 58 34 100 101 118 46 107 110 97 116 105 118 101 46 112 114 111 100 117 99 101 114 46 101 118 101 110 116 34 44 34 100 97 116 97 99 111 110 116 101 110 116 116 121 112 101 34 58 34 97 112 112 108 105 99 97 116 105 111 110 47 106 115 111 110 34 44 34 100 97 116 97 34 58 123 34 72 101 108 108 111 34 58 34 119 111 114 108 100 33 34 125 125]}) map[content-type:application/json id:936b10cf-529b-4a46-a5fe-8c63860501ed knativeerrorcode:%!s(int32=-1) knativeerrordest:http://consumer.spike.svc.cluster.local source:/apis/v1/namespaces/spike/rabbitmqsources/rabbitmq-source#source-queue specversion:1.0 subject:936b10cf-529b-4a46-a5fe-8c63860501ed time:2024-04-18T13:33:15.109959419Z traceparent:00-c8bb10f7310f95e5b9476c060c7898cf-f2c2a49b335ecb70-00 tracestate: type:dev.knative.rabbitmq.event] application/json  %!s(uint8=2) %!s(uint8=0)    936b10cf-529b-4a46-a5fe-8c63860501ed 0001-01-01 00:00:00 +0000 UTC    ctag-/ko-app/dispatcher-1 %!s(uint32=0) %!s(uint64=1) %!s(bool=false) t.spike.consumer-trigger.dlx.88892693432310cd99264ac5b7100d9a13  {\"specversion\":\"1.0\",\"id\":\"626a3c5e-1330-4e91-bb9c-2703349310fb\",\"source\":\"knative/eventing/producer/event\",\"type\":\"dev.knative.producer.event\",\"datacontenttype\":\"application/json\",\"data\":{\"Hello\":\"world!\"}}}","commit":"33a01ed-dirty"}

Which abled me to trace down this behavior for this excerpt: https://github.com/knative-extensions/eventing-rabbitmq/blob/knative-v1.13.0/pkg/dispatcher/dispatcher.go#L287-L292

Knative release version Knative serving/eventing version: 1.12.0 Eventing RabbitMQ version: 1.13.0

Additional context If this a known behavior, maybe we could update documentation at least before addressing such problem.

ikavgo commented 6 months ago

Hi, thanks for the report.

I think the same situation in the case of dead lettering via rabbit itself. This failure modes are not atomic unfortunately. Please tell me more about your use case.

github-actions[bot] commented 3 months ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.