knative / eventing

Event-driven application platform for Kubernetes
https://knative.dev/docs/eventing
Apache License 2.0
1.41k stars 590 forks source link

[Support Request | Help] Backend function (sink) was triggered multiple times within one function invocation #7515

Closed piyoki closed 9 months ago

piyoki commented 9 months ago

Expected Behavior

Generally, I expected that my backend function would be triggered once only per function invocation.

Any help would be much appreciated. Thanks.

Actual Behavior

The backend function was triggered 10 times per function invocation.

Logs from the imc-dispatcher

{"level":"error","ts":"2023-12-18T13:57:05.059Z","logger":"inmemorychannel-dispatcher","caller":"fanout/fanout_event_handler.go:303","msg":"Fanout had an error","commit":"747a9f5","knative.dev/pod":"imc-dispatcher-69d78445c5-prbhr","knative.dev/controller":"knative.dev.eventing.pkg.reconciler.inmemorychannel.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.InMemoryChannel","knative.dev/traceid":"a6acd47a-55b3-4ace-a0fd-ee45f816629d","knative.dev/key":"staging/default-kne-trigger","error":"unable to complete request to http://broker-filter.knative-eventing.svc.cluster.local/triggers/staging/helloworld-cloudevent-function/87db3ff9-b234-46d5-b7dc-92cb788c1bdc: unexpected HTTP response, expected 2xx, got 502","stacktrace":"knative.dev/eventing/pkg/channel/fanout.(*FanoutEventHandler).dispatch\n\tknative.dev/eventing/pkg/channel/fanout/fanout_event_handler.go:303\nknative.dev/eventing/pkg/channel/fanout.NewFanoutEventHandler.createEventReceiverFunction.func1.1\n\tknative.dev/eventing/pkg/channel/fanout/fanout_event_handler.go:223"}
{"level":"error","ts":"2023-12-18T14:00:30.641Z","logger":"inmemorychannel-dispatcher","caller":"fanout/fanout_event_handler.go:303","msg":"Fanout had an error","commit":"747a9f5","knative.dev/pod":"imc-dispatcher-69d78445c5-prbhr","knative.dev/controller":"knative.dev.eventing.pkg.reconciler.inmemorychannel.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.InMemoryChannel","knative.dev/traceid":"a6acd47a-55b3-4ace-a0fd-ee45f816629d","knative.dev/key":"staging/default-kne-trigger","error":"unable to complete request to http://broker-filter.knative-eventing.svc.cluster.local/triggers/staging/helloworld-cloudevent-function/87db3ff9-b234-46d5-b7dc-92cb788c1bdc: unexpected HTTP response, expected 2xx, got 502","stacktrace":"knative.dev/eventing/pkg/channel/fanout.(*FanoutEventHandler).dispatch\n\tknative.dev/eventing/pkg/channel/fanout/fanout_event_handler.go:303\nknative.dev/eventing/pkg/channel/fanout.NewFanoutEventHandler.createEventReceiverFunction.func1.1\n\tknative.dev/eventing/pkg/channel/fanout/fanout_event_handler.go:223"}

Steps to Reproduce the Problem

  1. Create a default broker in the staging namespace
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  namespace: staging
  annotations:
    eventing.knative.dev/broker.class: MTChannelBasedBroker
  1. Create trigger that maps to the backend function
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: helloworld-cloudevent-function
  namespace: staging
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.staging.helloworld-cloudevent-function
      source: dev.knative.staging/eventing
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: helloworld-cloudevent-function
  1. Invoke the function with curl command or API Client Tool like Insomnia
curl --request POST \
  --url http://broker-ingress.knative-eventing.svc.cluster.local/staging/default  \
  --header 'Ce-Id: 730bb8d8-0b5a-4184-9c69-5840078ef7d6' \
  --header 'Ce-Source: dev.knative.staging/eventing' \
  --header 'Ce-Type: dev.knative.staging.helloworld-cloudevent-function' \
  --header 'Ce-specversion: 1.0' \
  --header 'Content-Type: application/json' \
  --header 'User-Agent: insomnia/8.4.5' \
  --data '{"msg": "ping"}'

Additional Info

Installation

Knative Operator -> Knative-Eventing

Logs from the backend

Source Code: https://github.com/rocats/helloworld-cloudevent-function

...
10.0.0.47 - - [18/Dec/2023 14:05:17] "POST / HTTP/1.1" 200 -
2023-12-18 14:05:43 [debug    ] Function loaded
2023-12-18 14:05:43 [debug    ] Received event with ID: 730bb8d8-0b5a-4184-9c69-5840078ef7d6 and data {'msg': 'ping'}
2023-12-18 14:05:43 [debug    ] Cloudevent: {'attributes': {'specversion': '1.0', 'id': '730bb8d8-0b5a-4184-9c69-5840078ef7d6', 'source': 'dev.knative.staging/eventing', 'type': 'dev.knative.staging.helloworld-cloudevent-function', 'datacontenttype': 'application/json', 'knativearrivaltime': '2023-12-18T14:04:52.220290693Z', 'time': '2023-12-18T14:05:43.305629+00:00'}, 'data': {'msg': 'ping'}}
2023-12-18 14:05:43 [info     ] OK
10.0.0.47 - - [18/Dec/2023 14:05:43] "POST / HTTP/1.1" 200 -
2023-12-18 14:06:21 [debug    ] Function loaded
2023-12-18 14:06:21 [debug    ] Received event with ID: 536808d3-88be-4077-9d7a-a3f162705f79 and data {'msg': 'ping'}
2023-12-18 14:06:21 [debug    ] Cloudevent: {'attributes': {'specversion': '1.0', 'id': '536808d3-88be-4077-9d7a-a3f162705f79', 'source': 'dev.knative.staging/eventing', 'type': 'dev.knative.staging.helloworld-cloudevent-function', 'datacontenttype': 'application/json', 'knativearrivaltime': '2023-12-18T14:02:56.76725493Z', 'time': '2023-12-18T14:06:21.463494+00:00'}, 'data': {'msg': 'ping'}}
2023-12-18 14:06:21 [info     ] OK
10.0.0.47 - - [18/Dec/2023 14:06:21] "POST / HTTP/1.1" 200 -
2023-12-18 14:06:34 [debug    ] Function loaded
2023-12-18 14:06:34 [debug    ] Received event with ID: 730bb8d8-0b5a-4184-9c69-5840078ef7d6 and data {'msg': 'ping'}
2023-12-18 14:06:34 [debug    ] Cloudevent: {'attributes': {'specversion': '1.0', 'id': '730bb8d8-0b5a-4184-9c69-5840078ef7d6', 'source': 'dev.knative.staging/eventing', 'type': 'dev.knative.staging.helloworld-cloudevent-function', 'datacontenttype': 'application/json', 'knativearrivaltime': '2023-12-18T14:04:52.220290693Z', 'time': '2023-12-18T14:06:34.515597+00:00'}, 'data': {'msg': 'ping'}}
2023-12-18 14:06:34 [info     ] OK
10.0.0.47 - - [18/Dec/2023 14:06:34] "POST / HTTP/1.1" 200 -
2023-12-18 14:08:16 [debug    ] Function loaded
2023-12-18 14:08:16 [debug    ] Received event with ID: 730bb8d8-0b5a-4184-9c69-5840078ef7d6 and data {'msg': 'ping'}
2023-12-18 14:08:16 [debug    ] Cloudevent: {'attributes': {'specversion': '1.0', 'id': '730bb8d8-0b5a-4184-9c69-5840078ef7d6', 'source': 'dev.knative.staging/eventing', 'type': 'dev.knative.staging.helloworld-cloudevent-function', 'datacontenttype': 'application/json', 'knativearrivaltime': '2023-12-18T14:04:52.220290693Z', 'time': '2023-12-18T14:08:16.925878+00:00'}, 'data': {'msg': 'ping'}}
2023-12-18 14:08:16 [info     ] OK
10.0.0.47 - - [18/Dec/2023 14:08:16] "POST / HTTP/1.1" 200 -
...
Cali0707 commented 9 months ago

Hey @yqlbu would you be able to provide which versions of k8s and knative you are using here?

Cali0707 commented 9 months ago

/triage needs-user-input

Cali0707 commented 9 months ago

Also, one thing to check is that the responses from your function should be valid cloud events. As far as I can tell, broker checks if the response is a valid cloud event, and returns an error otherwise (triggering a retry of the request up to the max in the delivery spec).

cc @matzew @creydr in case I'm missing something here

Cali0707 commented 9 months ago

See the conversation here for more

piyoki commented 9 months ago

Also, one thing to check is that the responses from your function should be valid cloud events. As far as I can tell, broker checks if the response is a valid cloud event, and returns an error otherwise (triggering a retry of the request up to the max in the delivery spec).

cc @matzew @creydr in case I'm missing something here

Thanks for your prompt response. Appreciate your kindness to offer help. My backend function is a very simple helloworld function that is powered by functions-framework. The framework highlights that it natively supports cloudevent and is compatible with Knative environment. To put it more precisely, it will respond with message “OK” and 200 status_code if the message is consumed and processed successfully. I am interested in knowing more details about how Knative broker handles the HTTP-based response and what format it expects. Thanks.

Source code available at https://github.com/rocats/helloworld-cloudevent-function

Cali0707 commented 9 months ago

@yqlbu the broker expects a cloud event which corresponds to the spec. Maybe you can use the python SDK to make a cloud event? Or alternatively, return '', 200 may work, as Pierangelo described here. However, I am not very familiar with flask.

For some sample nodejs functions, feel free to look around at the code samples in https://github.com/Cali0707/knative-eda-demos

piyoki commented 9 months ago

See the conversation here for more

Thanks for pinning this similar issue. I will give a try tomorrow.

piyoki commented 9 months ago

@yqlbu the broker expects a cloud event which corresponds to the spec. Maybe you can use the python SDK to make a cloud event? Or alternatively, return '', 200 may work, as Pierangelo described here. However, I am not very familiar with flask.

For some sample nodejs functions, feel free to look around at the code samples in https://github.com/Cali0707/knative-eda-demos

Hey @Cali0707, I can confirm that the proposed solution works like a charm!

2023-12-19 00:31:40 [debug    ] Function loaded
2023-12-19 00:31:40 [debug    ] Received event with ID: 730bb8d8-0b5a-4184-9c69-5840078ef7d6 and data {'msg': 'ping'}
2023-12-19 00:31:40 [debug    ] Cloudevent: {'attributes': {'specversion': '1.0', 'id': '730bb8d8-0b5a-4184-9c69-5840078ef7d6', 'source': 'dev.knative.staging/eventing', 'type': 'dev.knative.staging.helloworld-cloudevent-function', 'datacontenttype': 'application/json', 'knativearrivaltime': '2023-12-19T00:31:40.959135649Z', 'time': '2023-12-19T00:31:40.963316+00:00'}, 'data': {'msg': 'ping'}}
2023-12-19 00:31:40 [info     ] OK
10.0.0.47 - - [19/Dec/2023 00:31:40] "POST / HTTP/1.1" 204 -

Yet, I think it'd be better if we mention these particular notes somewhere in the documentation. We just need to highlight the default behavior of the Knative Broker including how it handles responses and how it considers responses as valid.

piyoki commented 9 months ago

Also, I wonder if the Knative Broker is capable of returning a raw response from the function alone with custom status_code.

e.g

{
   "message": "sent from my function"
}

200

Cali0707 commented 9 months ago

Yet, I think it'd be better if we mention these particular notes somewhere in the documentation. We just need to highlight the default behavior of the Knative Broker including how it handles responses and how it considers responses as valid.

@yqlbu for sure, would you be interested in contributing this improvement? If you are, then the documentation is all in the markdown files in https://github.com/knative/docs - we always love docs contributions!!

piyoki commented 9 months ago

Yet, I think it'd be better if we mention these particular notes somewhere in the documentation. We just need to highlight the default behavior of the Knative Broker including how it handles responses and how it considers responses as valid.

@yqlbu for sure, would you be interested in contributing this improvement? If you are, then the documentation is all in the markdown files in https://github.com/knative/docs - we always love docs contributions!!

No problem, my pleasure. Cheers.

Cali0707 commented 9 months ago

Also, I wonder if the Knative Broker is capable of returning a raw response from the function alone with custom status_code.

e.g

{
   "message": "sent from my function"
}

200

For this, you need to look at the cloudevents format. Knative brokers only accept http cloudevents. To create one, you can either read the spec and set the appropriate http headers yourself, or use the cloudevents python sdk (or similar for other languages)

piyoki commented 9 months ago

Also, I wonder if the Knative Broker is capable of returning a raw response from the function alone with custom status_code. e.g

{
   "message": "sent from my function"
}

200

For this, you need to look at the cloudevents format. Knative brokers only accept http cloudevents. To create one, you can either read the spec and set the appropriate http headers yourself, or use the cloudevents python sdk (or similar for other languages)

Sorry about the confusion here. Let me clarify this inquiry. So, what I would like to achieve is that I want to use Knative broker to invoke my function and then return the response orginated from my backend function with custom response status code. How could we achieve so? Currently, I can only receive 202 response code with empty response body.

Cali0707 commented 9 months ago

Sorry if I'm not understanding your question properly. If you want to return the response from your backend function you will need to use a cloudevent. Cloudevents can have arbitrary data in the data field, but need some specific headers to be set. You can read more in the links I've sent above for how that will work. Knative brokers will not accept anything except for valid cloudevents, so if you want to return data from your function you will need to use a cloudevent to do so. The good news is they are a lightweight set of headers in your http request/response.

According to the Knative spec the 202 status code indicates that the event was delivered successfully, and the 200 status code indicates that the event was delivered successfully and the event was in the reply. However in practice I believe that every 2xx response code is treated as a success

piyoki commented 9 months ago

Sorry if I'm not understanding your question properly. If you want to return the response from your backend function you will need to use a cloudevent. Cloudevents can have arbitrary data in the data field, but need some specific headers to be set. You can read more in the links I've sent above for how that will work. Knative brokers will not accept anything except for valid cloudevents, so if you want to return data from your function you will need to use a cloudevent to do so. The good news is they are a lightweight set of headers in your http request/response.

According to the Knative spec the 202 status code indicates that the event was delivered successfully, and the 200 status code indicates that the event was delivered successfully and the event was in the reply. However in practice I believe that every 2xx response code is treated as a success

Thanks for clarifying all the necessary details when it comes to how cloudevents are handled by the broker under the hood. Now, I think everything makes sense to me.

I believe this is something we may advocate to future Knative users. TBH, the learning curve is kinda steep. I will try my best to share my knowledge around this field to the Knative users and communities outside of Knative.

Cali0707 commented 9 months ago

Thanks for your help! If you need any assistance in contributing to the knative docs, please feel free to ping me on github or on the CNCF slack instance (there are many knative channels there, all under the naming convention #knative-*)

Cali0707 commented 9 months ago

Is this issue able to be closed now?

piyoki commented 9 months ago

Is this issue able to be closed now?

Sure.

piyoki commented 9 months ago

Issue resolved. Thanks all the kind help.