knative-extensions / eventing-kafka

Kafka integrations with Knative Eventing.
Apache License 2.0
77 stars 82 forks source link

Add support for a ConfigMap containing all the Sarama Kafka configuration #36

Closed eric-sap closed 4 years ago

eric-sap commented 4 years ago

Currently the Sarama configuration is set (in pkg/common/kafka/util/util.go) via some parameters and constants. We would like to use a configmap for these settings where possible and reasonable. The eventing-contrib/kafka implementation already has support for such a map, though with minimal content. We’d like to reuse whatever makes sense (such as map watching) but with our own defined content and parsing. This configmap may also include any other custom eventing-kafka config as well (i.e. it will not be Sarama-specific).

Details

Watching the Configmap

This is what eventing-contrib/kafka does, which seems like a good starting-point for eventing-kafka work toward a similar end:

kafka/channel/pkg/reconciler/controller/controller.go::NewController (passed in from sharedmain) uses the passed-in configmap.Watcher to call cmw.Watch("config-kafka", updateKafkaConfig), where "updateKafkaConfig" is a function that loads the configmap data and replaces the Reconciler.kafkaConfig with the one parsed out of the configmap.

The configmap parser itself uses code from knative.dev/pkg/configmap/parse.go that might be useful for our purposes as well (common way to parse out durations, ints, floats, etc).

If we don't want to switch to using sharedmain, we will need to create the watcher ourselves via sharedmain.SetupConfigMapWatchOrDie, which is what the pkg/common/k8s/observability.go code already does (not sure if there's a benefit to sharing a single watcher or not)

Loading Sarama Settings from the Configmap

Whether we need the huge block of all of the sarama settings or not, putting them as JSON into a configmap is a relatively ordinary procedure and results in something that looks like this:

apiVersion: v1
data:
  sarama: '{"Admin":{"Retry":{"Max":5,"Backoff":100000000},"Timeout":3000000000},"Net":{"MaxOpenRequests":5,"DialTimeout":30000000000,"ReadTimeout":30000000000,"WriteTimeout":30000000000,"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"Handshake":true,"AuthIdentity":"","User":"TestUsername","Password":"TestPassword","SCRAMAuthzID":"","TokenProvider":null,"GSSAPI":{"AuthType":0,"KeyTabPath":"","KerberosConfigPath":"","ServiceName":"","Username":"","Password":"","Realm":""}},"KeepAlive":30000000000,"LocalAddr":null,"Proxy":{"Enable":false,"Dialer":null}},"Metadata":{"Retry":{"Max":3,"Backoff":250000000},"RefreshFrequency":300000000000,"Full":true,"Timeout":0},"Producer":{"MaxMessageBytes":1000000,"RequiredAcks":1,"Timeout":10000000000,"Compression":0,"CompressionLevel":-1000,"Idempotent":false,"Return":{"Successes":false,"Errors":true},"Flush":{"Bytes":0,"Messages":0,"Frequency":0,"MaxMessages":0},"Retry":{"Max":3,"Backoff":100000000}},"Consumer":{"Group":{"Session":{"Timeout":10000000000},"Heartbeat":{"Interval":3000000000},"Rebalance":{"Timeout":60000000000,"Retry":{"Max":4,"Backoff":2000000000}},"Member":{"UserData":null}},"Retry":{"Backoff":2000000000},"Fetch":{"Min":1,"Default":1048576,"Max":0},"MaxWaitTime":250000000,"MaxProcessingTime":100000000,"Return":{"Errors":false},"Offsets":{"CommitInterval":0,"AutoCommit":{"Enable":true,"Interval":1000000000},"Initial":-1,"Retention":0,"Retry":{"Max":3}},"IsolationLevel":0},"ClientID":"TestClientId","RackID":"","ChannelBufferSize":256,"Version":{}}'
kind: ConfigMap
metadata:
  name: test
  namespace: test

Of course, the point here is that you can omit any entries from the "sarama" JSON that you want to leave as the defaults, so you could just have something that looks considerably simpler:

apiVersion: v1
data:
  sarama: '{"Net":{"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"User":"TestUsername","Password":"TestPassword"}},"Metadata":{"RefreshFrequency":300000000000},"ClientID":"TestClientId"}'
kind: ConfigMap
metadata:
  name: test
  namespace: test

Merging the JSON with the default sarama.Config is also not complex:

config := NewSaramaConfig(...)
json.Unmarshal([]byte(configMap.Data["sarama"]), &config)

If we need to save our current settings back to a configmap, that gets complicated (and will likely require our own in-sync copy of the sarama.Config struct with custom tags to ignore some JSON-unfriendly fields). This is possible and has been POC'd but will hopefully not be necessary for this issue (writing to the configmap will only be done manually when an administrator wants to make changes).

Implementing Config Changes

Regarding the procedures necessary to rebuild/restart sarama infrastructure when the config map changes, there are three main components of note:

The ReconcileKind function calls SetKafkaAdminClient(), which calls kafkaadmin.CreateAdminClient() every time, so we don't have an "old admin client with old settings" to worry about for the moment. This will change if we decide to keep the admin client around in the future but for now this appears to be a non-issue (the only change would be to load the settings from whatever the current configmap is instead of using constants and/or environment variables).

The implementation for the ConsumerGroup is all in one place, but there could be several pods, which would need to be reconfigured (or recreated). The UpdateSubscriptions() function loops through the subscribers and calls CreateConsumerGroup whenever it finds no subscriber wrapper for a particular subscriber UID. The ConsumerGroup goes into the SubscriberWrapper and remains as part of the DispatcherImpl (created in the dispatcher's main.go and remaining for the life of the pod). If the ConfigMap watcher is only going to run on the controller, it might be easiest to restart all of the dispatcher pods to pick up any new configuration data (the controller could parse the configmap and see if the changes are actually relevant to the ConsumerGroup first if desired)

Risk of trouble from restarting dispatcher pods is low, since the critical data is all stored in a kafka topic, not the eventing-kafka pods themselves, but unnecessary pod churn is, of course, preferably avoided. We could easily start with just having the controller restart the dispatcher pods when the configmap changes (which will presumably not be particularly often). Adjustments to a working system are likely easier than trying to work through all of the potential logic of changing the ConsumerGroups on the fly (either by having the dispatchers also watch the configmap or having a different IPC mechanism from the controller).

The Producer is similar to the ConsumerGroup in that it is created (once, in the NewProducer() function called by the channel's main.go code) and then used repeatedly (during handleMessage() which is called by when a message is received (due to the eventingchannel.NewMessageReceiver() call in the channel's main.go).

If we handle configmap changes by restarting the producer pod, we must be certain that we do not encounter any situation in which an event has been received (i.e. handleMessage has been called) and a non-error returned, but the message not persisted in the Kafka topic.

I don't see any real way for this to happen, as the ProduceKafkaMessage waits for the delivery report via the SyncProducer's SendMessage function before giving handleMessage the go-ahead to return no-error. If the process is terminated anywhere in that execution path, the caller of handleMessage (which is the ServeHTTP function in eventing's message_receiver.go file) will get an error (network or otherwise) that will be passed back to the caller (that is, the customer). Errors are acceptable for a brief time during reconfiguration; if less downtime is desired, the producer could watch the configmap directly and reconfigure the SyncProducer itself instead of letting the controller bounce it.

As with the ConsumerGroup it is probably best to start with a naïve restarting of pods and see if the required performance during configuration changes demands a modification to that approach.

eric-sap commented 4 years ago

Design decisions for this issue are as follows:

All pods should independently watch the configmap for changes.

This will allow us to use logic inside the pod to allow graceful shutdown of ConsumerGroups and the Producer.

Sarama components should only be restarted if the new configuration is different than the old one, for the component in question.

For example, if the only value changed is part of the Consumer section of the sarama.Config struct, the Producer should not be restarted. This may require keeping a local copy of the old sarama.Config and comparing the structs of interest.

Use the common package as much as possible for the configmap watching and handling

Obviously some logic will be needed on a per-component basis, but the main code to watch and respond to our configmap changes should be in "common" to the extent that it can.

travis-minke-sap commented 4 years ago

/assign @eric-sap