Closed rukai closed 3 weeks ago
Comparing rukai:kafka_sink_cluster_handle_receive_errors
(ce5e400) with main
(09ba622)
✅ 39
untouched benchmarks
rewrite PR to focus only on scram_over_mtls handling, this should make things a little simpler.
Done.
Currently KafkaSinkCluster ignores failures to receive messages: https://github.com/shotover/shotover-proxy/blob/17ccad5e257bf06836283ad8c49970b3dd63e749/shotover/src/transforms/kafka/sink_cluster/mod.rs#L1041 This is quite bad since many kinds of issues are silently ignored, none of our other transforms behave this way.
This PR fixes this by bubbling up errors when receiving from a connection. However, there are a few latent issues hidden by the lack of error handling here:
These must be resolved before we can enable handling of receive errors.
recreate connections before the delegation token expires
When scram_over_mtls is enabled, connections will be closed when the token it auth'd with expires. This PR introduces a system for recreating token authed connections before the token expires. This new system is implemented by:
KafkaConnection::General
- the existing behaviour, just routes directly to SinkConnection methodsKafkaConnection::ScramOverMtls
- the new behaviour for scram over mtlsConnectionState::AtRiskOfAuthTokenExpiry
before the scram connection expires.Alternate approach
As an alternative to the timeout approach we could rewrite the token task to send an
ArcSwap<DelegationToken>
which is then live updated, we could then check this token everytime we send to determine if we need to recreate the connection.We still need the old_connection fallback logic though, so largely this PR would remain the same. So I propose we land this and we could possibly swap to the ArcSwap approach in the future if it was valuable.