vmware-archive / kubeless

Kubernetes Native Serverless Framework
https://kubeless.io
Apache License 2.0
6.86k stars 754 forks source link

Kafka messages lost when triggered function removed #958

Open mmindenhall opened 6 years ago

mmindenhall commented 6 years ago

Kafka's default message semantics are "at least once", meaning that it guarantees delivery of produced messages to consumers, and in some corner cases, messages may be delivered more than once. These semantics become "at most once" when the function associated with a trigger is temporarily not available.

What happened:

In the following scenario, I saw messages lost:

  1. Deploy a function and a kafka trigger for the function (e.g., following steps here
  2. Tail the logs of the function pod in a terminal window
  3. Produce some messages on the topic (they'll be seen in the function logs)
  4. Delete the function, but leave the trigger in place
  5. Produce some more messages on the topic
  6. Deploy the function, tail the logs again when ready
  7. Observe that messages produced while the function was not present are lost

What you expected to happen:

There should be a way to specify the desired behavior of a trigger when the associated function is not available. The current behavior (dropping messages) can be the default, but there should be an option to ensure delivery of all messages by not committing offsets within the trigger until messages are delivered to a function. This may result in a flood of function calls if a trigger is left deployed for a long period of time without the associated function available.

How to reproduce it (as minimally and precisely as possible):

See above.

Environment:

andresmgot commented 6 years ago

Hi @mmindenhall,

In Kubeless, messages are sent "at most once", at this moment we cannot ensure that the function will receive a message. This is that way because right now message consumers live in a different Pod than the function. If for some reason the function Pod is not healthy, the consumer would receive an error code on his request and will discard the message (you should be able to see those errors in the controller logs). I agree that implementing an "at least once" policy can be a useful feature though.

Having said that, in the specific scenario that you specify, when deleting a function, the Kafka controller should detect that there is a consumer associated with the deleted function and it should delete that consumer. In the logs you should see something like We got a Kafka trigger TRIGGER that is associated with deleted function FUNC so cleanup Kafka consumer. If that was the only function listening for messages in that topic, from that moment, the messages should start gathering in the queue. Note that if there are other consumers for the same topic they will consume those messages, can that be your situation?

mmindenhall commented 5 years ago

Hi @andresmgot,

Thanks for the response! An "at least once" policy would be critical for us. I think it is doable even with the scenario you suggest.

  1. Within the Kafka controller, do not auto-commit offsets. Commit them only when messages have been delivered (by successfully invoking a consumer function).
  2. If no consumer can be invoked (i.e., no longer exists or is unresponsive), tear down the connection to the broker without committing any additional offsets.
  3. Wait for a consumer function to become available again.
  4. Reconnect to the broker (with same consumer group id as before), at which point you should receive the latest uncommitted offsets (even if those messages had already been delivered before disconnecting).

Technically, even this can't be considered "at least once", as the function might be successfully invoked, but fail to process the message. To really close the loop, the Kafka controller should only commit offsets upon a successful return from the function. Since functions just return strings, it would be a challenge to consistently define "success".

andresmgot commented 5 years ago

That's indeed an interesting approach. Right now I don't have the time to work on this but maybe you are able to give it a try? The code that handles the code consumption is here:

https://github.com/kubeless/kafka-trigger/blob/master/pkg/event-consumers/kafka/kafka-consumer.go#L75

If you are able to work on that I will be happy to help with any integration needed or if you find any issue.

mmindenhall commented 5 years ago

I'm also pretty busy at the moment, but this might be something I can look at over the Christmas holidays. Thanks!