As part of the initial changes in remote cluster state, we are uploading the cluster metadata to remote store during every cluster state publication. This remote cluster metadata gets utilized by the cluster manager nodes during recovery scenario. But there is no change to way the cluster state is published to other nodes in the cluster. The cluster manager still sends the full or diff cluster state to data nodes and other cluster manager nodes over transport layer. This increases the load on the cluster manager mainly since the cluster manager has to send cluster state objects to all the nodes over transport layer for every cluster state update. This becomes one of the bottlenecks in increasing the size of the cluster in terms number of indices, number of shards and number of nodes as any change in these parameters tends to increase the size of the cluster state.
Instead of sending the cluster state object over the transport layer, the cluster manager node can just send the term and version of the cluster state over the transport layer which can then be used to download the corresponding cluster state by all the nodes directly from the remote store.
Related component
Cluster Manager
Additional context
Cluster State Persistence vs Publication
Before we get into the mechanism to publish the cluster state using remote store, we should understand the some differences between the features:
Use case - Cluster State persistence on local or remote is done for the purpose of recovery/durability. Cluster state publication is done to inform the other nodes about the changes in the state so that they can appropriate actions on the basis of the changes.
Relevant objects - Since the purpose of cluster state persistence is recovery, only the cluster state metadata object is persisted since this is the object required for recovery. The rest of the objects are reconstructed by the cluster. But for publication, we need the entire cluster state object including the ephemeral entities like snapshots-in-progress. The complete list of objects which are not persisted to local/remote store are as follows : transient cluster settings, hashes of consistent settings, cluster blocks, discovery nodes, routing table, custom objects (snapshots in progress, snapshot deletion in progress, restore in progress, repository cleanup in progress).
Serialization - The cluster metadata is serialized for persistence using the toXContent method. This method does not consider the OpenSearch version of the node since it is assumed that during recovery, the cluster state will read by a node of the same version. But for publication, it is possible that there are nodes of different versions in the cluster for example during version upgrade. So for publication, the serialization is done using writeTo method. The stream which is passed as as an input parameter to this method contains the version information. There may be some custom logic based on the node version which can affect the serialization. If the cluster is serialized using toXContent for publication, it is possible that this serialized cluster state may not be read by the node of a different version.
Diff Computation of Cluster State in Existing Flow
The diff is computed in a different manner for each of the objects.
For the primitive data types like long, boolean and String, only the latest data is added irrespective of the previous state.
Some of the objects which are not diffable like CoordinationMetadata and Settings, the entire latest data is serialized.
For some objects which implement the AbstractDiffable class like DiscoveryNodes, the diff is computed using the CompleteDiff class. This class either adds an empty object if the previous and current data are same or adds the entire current data if the previous and current data are different.
For of the generic maps like index name to IndexMetadata map, the diff is computed using DiffableUtils. This utility computes the diff by finding the inserts, updates and deletes to the map with respect to the previous state. The deleted keys are added to the deletes list. The inserted entries are added to the upserts map. The updated entries are added to the diffs map if the value support diff otherwise they added to the upserts.
ClusterState {
Metadata metadata {
long version; // Latest data added to diff
String clusterUUID; // Latest data added to diff
boolean clusterUUIDcommitted; // Latest data added to diff
CoordinationMetadata coordinationMetadata; // Latest data added to diff. But this
// should not be huge it contains mostly the nodeIds for voting configuration
// and voting exclusions.
Settings transientSettings; // Latest data added to diff
Settings persistentSettings; // Latest data added to diff
DiffableStringMap hashesOfConsistentSettings; // Diff is computed for the map by
// finding the inserts, updates and deletes to the map.
Map<String, IndexMetadata> indices; // Diff is computed using DiffableUtils
// which adds the inserts, deletes and updates to the diff.
// Most of the IndexMetadata is made of settings and mappings.
// The settings are not diffable and the diff for mappings is computed using CompleteDiff .
// CompleteDiff adds the entire object even if there is any small change in the object otherwise it adds empty object to the diff.
// So if a single field is added to the mappings, entire mappings will be added to the diff since the mappings are compressed.
// Size of Settings will generally be small but in cases where the in-line analyzers are present, the size of settings can be large.
Map<String, IndexTemplateMetadata> templates; // diff computed using DiffableUtils
// But for IndexTemplateMetadata, diff is computed using CompleteDif.
// So values in the upserts map of the diff will contain entire index templates which have been added or updated.
Map<String, Custom> customs; // diff computed using DiffableUtils.
// So the customs which are changed will be added to the upserts map in the diff and the customs which are deleted will be added to deletes list.
}
String stateUUID - from and to UUID are added in diff
long version - from and to version added in diff
RoutingTable routingTable {
long version
Map<String, IndexRoutingTable> indicesRouting; // diff computed using DiffableUtils
// So the indices whose routing routing table is deleted, they will be added to a deletes list.
// The indices which are added are updated, the routing table for those indices will be added to upserts map.
}
DiscoveryNodes nodes;
// Diff computed using CompleteDiff.
// CompleteDiff sends empty object if the previous and current are same.
// It sends full object if the previous and current are different.
ClusterBlocks blocks;
// Diff computed using CompleteDiff
Map<String, Custom> customs;
// Diff computed using DiffableUtils.
// But all the Custom classes implement the AbstractNamedDiffable class.
// So when DiffableUtils computes the diff, the entries will added to the upserts map.
// The values in the upserts map will be CompleteDiff which will have empty object or full objects.
}
Summary
The computation of diff is not very optimized.
For index metadata, we compute the index metadata for the indices which have changed. This diff of index metadata will always contain full settings. It will contain full mappings even when there a single field updated in the mapping. This computation of diff is very similar to the incremental metadata that we upload currently where we upload the entire index metadata whenever there is a small change in it.
For routing table, even if we upload the diff, the code will be updating for the full routing table for each of the indices which have changed. So this would be the same as the existing approach which is designed for routing table - split the routing table by indices and update them individually as and when they change.
So we know that we have to anyway upload all the incrementally changed objects for every cluster state update. If we upload the diff as computed currently which is not optimized fully, we will be uploading a lot of redundant data. So we can just go with the approach uploading an additional object which would help in identifying which components in the cluster state have changed. This object would then be used by the follower nodes to download the incremental changes and apply to the cluster state.
Approach
Based on the above analysis, we need to achieve the below tasks for the remote state publication to work.
Upload Ephemeral Objects to Remote Store
Currently we only publish the Metadata object in the cluster state to remote store. This is sufficient for the durability/persistence use case as the rest of the objects can be recreated by the cluster manager node. But for cluster state publication all the other objects as well. So we would need to publish all the objects to remote store. Listing the extra objects to be published:
RoutingTable - There is already a plan for uploaded this to remote store. As per the design this will split on a per index level and the routing table for each index will be uploaded to remote store. The file locations for RoutingTable will be present in the RoutingTableManifest file.
DiscoveryNodes - This should be uploaded as a single file.
ClusterBlocks - This should be uploaded as a single file
Map<String, ClusterState.Custom> - This contains the entries for SnapshotsInProgress, SnapshotDeletionsInProgress, RestoreInProgress, RepositoryCleanupInProgress. Each of this entries should be uploaded as a separate file to remote store
Transient Cluster Settings - This should be uploaded as a single file to remote store.
Hashes of Consistent Settings - This should be uploaded as a single file to remote store.
Upload a Diff Object
In order to to publish the cluster state, we will first publish a diff file to the remote store. Based on the analysis for diff calculation above it is not optimal to publish a single diff blob to the remote store since in the cases where multiple indices have been changed, the diff blob would contain the entire metadata of all these indices which will make the diff file large. Instead the diff file would just contain the the information about which of the objects that have changed. Based on this information, the file locations for these objects can be fetched from manifest and downloaded. The diff file would look like below:
This diff object can be part of the ClusterMetadataManifest itself as it should not be very large.
Cluster Manager Node Publishes the term and version
Current state: In order to publish the cluster state the cluster manager node sends the diff state to all the nodes over the transport layer. If a follower node cannot apply the diff it responds with an exception and then the cluster manager sends the full state to the node.
When a node is trying to join the cluster, the cluster manager sends the cluster state in a validate join request to the joining node.
Proposed: The cluster manager node will send only the term and version of the cluster state. The follower node will download the manifest file for the term and version. From the manifest file, the node will get the diff and determine if the diff can be applied. If the diff cannot be applied, it will download all the files for the new cluster state. If the diff can be applied, it will find the objects which have changed and then download the files for all the objects and apply on the old cluster state.
In the validate join request as well, the term and version will only be sent. The joining node would download the full state corresponding term and version and perform the validations. The OpenSearch version of the joining node will be checked to ensure we have published the cluster state for this version.
New transport request will be created for the above changes.
Considerations
Version Specific Serialization of Cluster State
When the cluster manager is on a certain OpenSearch version, there can be some follower nodes which are on a different version. So the cluster state is sent over the tranport layer by serializing the cluster state according to the target node’s version. So the serialization should is done using writeTo method since this method is version aware.
The cluster manager node would have a visibility into the different versions of nodes present in the cluster. It should publish the cluster state for all the versions of nodes present in the cluster. Ideally this scenario should only happen during version upgrade scenario, so there should be a maximum of 2 different versions present at a time in the cluster.
So we need analyze and decide how should the serialization be done for remote state publication
Background
As part of the initial changes in remote cluster state, we are uploading the cluster metadata to remote store during every cluster state publication. This remote cluster metadata gets utilized by the cluster manager nodes during recovery scenario. But there is no change to way the cluster state is published to other nodes in the cluster. The cluster manager still sends the full or diff cluster state to data nodes and other cluster manager nodes over transport layer. This increases the load on the cluster manager mainly since the cluster manager has to send cluster state objects to all the nodes over transport layer for every cluster state update. This becomes one of the bottlenecks in increasing the size of the cluster in terms number of indices, number of shards and number of nodes as any change in these parameters tends to increase the size of the cluster state.
https://github.com/opensearch-project/OpenSearch/issues/11744
Proposal
Instead of sending the cluster state object over the transport layer, the cluster manager node can just send the term and version of the cluster state over the transport layer which can then be used to download the corresponding cluster state by all the nodes directly from the remote store.
Related component
Cluster Manager
Additional context
Cluster State Persistence vs Publication
Before we get into the mechanism to publish the cluster state using remote store, we should understand the some differences between the features:
Diff Computation of Cluster State in Existing Flow
The diff is computed in a different manner for each of the objects.
Summary
The computation of diff is not very optimized. For index metadata, we compute the index metadata for the indices which have changed. This diff of index metadata will always contain full settings. It will contain full mappings even when there a single field updated in the mapping. This computation of diff is very similar to the incremental metadata that we upload currently where we upload the entire index metadata whenever there is a small change in it. For routing table, even if we upload the diff, the code will be updating for the full routing table for each of the indices which have changed. So this would be the same as the existing approach which is designed for routing table - split the routing table by indices and update them individually as and when they change. So we know that we have to anyway upload all the incrementally changed objects for every cluster state update. If we upload the diff as computed currently which is not optimized fully, we will be uploading a lot of redundant data. So we can just go with the approach uploading an additional object which would help in identifying which components in the cluster state have changed. This object would then be used by the follower nodes to download the incremental changes and apply to the cluster state.
Approach
Based on the above analysis, we need to achieve the below tasks for the remote state publication to work.
Upload Ephemeral Objects to Remote Store
Currently we only publish the Metadata object in the cluster state to remote store. This is sufficient for the durability/persistence use case as the rest of the objects can be recreated by the cluster manager node. But for cluster state publication all the other objects as well. So we would need to publish all the objects to remote store. Listing the extra objects to be published:
Upload a Diff Object
In order to to publish the cluster state, we will first publish a diff file to the remote store. Based on the analysis for diff calculation above it is not optimal to publish a single diff blob to the remote store since in the cases where multiple indices have been changed, the diff blob would contain the entire metadata of all these indices which will make the diff file large. Instead the diff file would just contain the the information about which of the objects that have changed. Based on this information, the file locations for these objects can be fetched from manifest and downloaded. The diff file would look like below:
This diff object can be part of the ClusterMetadataManifest itself as it should not be very large.
Cluster Manager Node Publishes the term and version
Current state: In order to publish the cluster state the cluster manager node sends the diff state to all the nodes over the transport layer. If a follower node cannot apply the diff it responds with an exception and then the cluster manager sends the full state to the node. When a node is trying to join the cluster, the cluster manager sends the cluster state in a validate join request to the joining node. Proposed: The cluster manager node will send only the term and version of the cluster state. The follower node will download the manifest file for the term and version. From the manifest file, the node will get the diff and determine if the diff can be applied. If the diff cannot be applied, it will download all the files for the new cluster state. If the diff can be applied, it will find the objects which have changed and then download the files for all the objects and apply on the old cluster state. In the validate join request as well, the term and version will only be sent. The joining node would download the full state corresponding term and version and perform the validations. The OpenSearch version of the joining node will be checked to ensure we have published the cluster state for this version. New transport request will be created for the above changes.
Considerations
Version Specific Serialization of Cluster State
When the cluster manager is on a certain OpenSearch version, there can be some follower nodes which are on a different version. So the cluster state is sent over the tranport layer by serializing the cluster state according to the target node’s version. So the serialization should is done using writeTo method since this method is version aware. The cluster manager node would have a visibility into the different versions of nodes present in the cluster. It should publish the cluster state for all the versions of nodes present in the cluster. Ideally this scenario should only happen during version upgrade scenario, so there should be a maximum of 2 different versions present at a time in the cluster. So we need analyze and decide how should the serialization be done for remote state publication