and adds a state: Box<NodeState> field to KafkaNode.
This allows each transform instance to read (and write!) the up/down state of each kafka node.
This approach relies on self.nodes being only an exact clone of self.nodes_shared. self.nodes should purely be an optimization to avoid having to constantly take read locks on self.nodes_shared.
However that is currently not true!
There are two things breaking this invariant:
During handshake stage KafkaSinkCluster will insert nodes based on the first_contact_points directly into self.nodes bypassing self.nodes_shared.
Instead of cloning from self.nodes_shared, fields are manually copied over and new nodes are added. This is to work around the fact that KafkaNodes have a connection field that must be used in self.nodes nodes but never in self.nodes_shared nodes.
So in order to make this KafkaNode::state field robust, we need to remove these two invariant breakages.
And that is what the rest of the changes in this PR are for.
The core change is that the connection field is removed from KafkaNode and instead we keep a BrokerId -> SinkConnection hashmap, so we can easily lookup the connection given a KafkaNode.
In fact this hashmap goes a little further than that. This enum is defined for use as the hashmaps key:
This allows us to store not just the connections for the KafkaNodes but also a control connection which can be used for routing both the handshake and metadata requests.
This resolves the first invariant breakage, since we can route the handshake to the control connection, we can avoid having to directly insert nodes into self.nodes in order to route the handshake messages which must occur before we can request node metadata from the cluster.
The 2nd invariant breakage is resolved by the removal of the connection field from KafkaNode.
This allows us to turn the update_local_nodes method into a very simple clone from self.nodes_shared into self.dodes.
shared and local KafkaNodes are now completely compatible. Previously we needed to manually filter out and reinsert the connections field.
control connection is now less special and is stored along with all the other connections
Old connection handling
A large motivation for the original approach of storing connections in the nodes is that previously we would directly call .send() on the connection, however that is no longer true since we now perform a routing phase before calling .send()
This PR creates:
and adds a
state: Box<NodeState>
field to KafkaNode.This allows each transform instance to read (and write!) the up/down state of each kafka node.
This approach relies on
self.nodes
being only an exact clone ofself.nodes_shared
.self.nodes
should purely be an optimization to avoid having to constantly take read locks onself.nodes_shared
. However that is currently not true! There are two things breaking this invariant:self.nodes
bypassingself.nodes_shared
.self.nodes_shared
, fields are manually copied over and new nodes are added. This is to work around the fact that KafkaNodes have a connection field that must be used inself.nodes
nodes but never inself.nodes_shared
nodes. So in order to make thisKafkaNode::state
field robust, we need to remove these two invariant breakages.And that is what the rest of the changes in this PR are for. The core change is that the connection field is removed from KafkaNode and instead we keep a BrokerId -> SinkConnection hashmap, so we can easily lookup the connection given a KafkaNode. In fact this hashmap goes a little further than that. This enum is defined for use as the hashmaps key:
This allows us to store not just the connections for the
KafkaNodes
but also a control connection which can be used for routing both the handshake and metadata requests. This resolves the first invariant breakage, since we can route the handshake to the control connection, we can avoid having to directly insert nodes intoself.nodes
in order to route the handshake messages which must occur before we can request node metadata from the cluster.The 2nd invariant breakage is resolved by the removal of the connection field from KafkaNode. This allows us to turn the
update_local_nodes
method into a very simple clone fromself.nodes_shared
intoself.dodes
.Alternative approaches
I originally tried to solve this problem via https://github.com/shotover/shotover-proxy/pull/1702 but I prefer the approach in this PR.
Advantages to this approach:
KafkaNode
s are now completely compatible. Previously we needed to manually filter out and reinsert the connections field.Old connection handling
A large motivation for the original approach of storing connections in the nodes is that previously we would directly call .send() on the connection, however that is no longer true since we now perform a routing phase before calling .send()