opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.62k stars 1.76k forks source link

[Segment Replication] Should consider using RAFT consensus algorithm for Segment replication #6369

Open Jeevananthan-23 opened 1 year ago

Jeevananthan-23 commented 1 year ago

Hello @mikemccand / @mch2, I could like to understand incontext of how shard promotion (Leader Election) works with the below proposals. May why not consider distributed consensus algorithm like RAFT.

  1. Send a {n} (n is larger than the largest segment counter in current SegmentInfos) to master node before segment replication progress so that master can tell the newly promoted Primary Shard not to generate any segment less than {n}. To reduce the pressure of master node, we don't need to send this information every time. For example, if the max segment is _4.si in primary's current SegmentInfos, we can send _rw(or 1004 in decimal) to master node. After segment increases to _rw.si, we send _1jo(or 2004 in decimal) to master node.

what happen when master primary shard dies at first inside the cluster and newly prometed primary shard has same segment number and how promotion happens?

  1. Before choosing a replica doing promotion progress, master node must ask all replicas whose replication state is newest.

Using distributed consensus algorithm like Raft should be the great choice because copying the merged segment and transfer to replicas and support learder election as @mikemccand mentioned in his blog Segment Replication cluster state.

Originally posted by @Jeevananthan-23 in https://github.com/opensearch-project/OpenSearch/issues/2212#issuecomment-1429784251

Jeevananthan-23 commented 1 year ago

Tracking existing Shard Promotion(Leader Election):

mch2 commented 1 year ago

@Jeevananthan-23 Thanks for raising this! Consensus is useful for electing cluster manager nodes, but I don't think it's required on primary failure.

2212 is around handling failover within a replication group with segment replication enabled. During the failover case today, the cluster manager node makes a decision on which replica should be elected as the new primary within the replication group here, by considering only if the candidate is active & selecting the furthest ahead in terms of OpenSearch version. With segment replication, we also want to take into account the candidate's latest SegmentInfos version. We want to do this to ensure that we are 1) not reindexing documents that have already been indexed and 2) to avoid creating new segments of the same name that already exist somewhere within the replication group.

what happen when master primary shard dies at first inside the cluster and newly promoted primary shard has same segment number and how promotion happens?

This is the desired case with segment replication. The newly promoted primary would have previously been syncing segments with the old primary, so it will have up to the old primary's latest segments at the time of failover. The new primary will continue indexing and create new segments that no other replica in the group has.

If the newly elected primary is behind the old primary but another replica in the replication group is up to date, this is where the conflict occurs. The newly elected primary will in this case replay form its translog after promotion & create new segments with the same name as existing segments on the other replica. https://github.com/opensearch-project/OpenSearch/pull/4365 was an attempt to prevent the newly elected primary from creating new segments with a name higher than that of a segment on a pre existing replica. However, this solution is not fool proof, we only bump the counter (which drives the segment name) by an arbitrary amount, so if the newly elected primary was behind the old by more than that amount, we could still see conflicts. If this happens, the newly elected primary will continue, yet the replicas would fail & need recovery.

Send a {n} (n is larger than the largest segment counter in current SegmentInfos) to master node before segment replication progress so that master can tell the newly promoted Primary Shard not to generate any segment less than {n}. To reduce the pressure of master node, we don't need to send this information every time. For example, if the max segment is _4.si in primary's current SegmentInfos, we can send _rw(or 1004 in decimal) to master node. After segment increases to _rw.si, we send _1jo(or 2004 in decimal) to master node.

This was a suggestion to store the former primary's state within cluster state, so that we increase the counter by a known amount instead of some arbitrary long.

IMO we should update this logic that executes on cluster managers to fetch the latest checkpoint from all candidate replicas, and select the one with the highest value, this would add some latency to fetch from each replica, but I can't imagine it being too expensive in exchange for correctness. Alternatively, we could store in cluster state after each replica updates to a new set of segments, so that cluster managers already have this state, yet this would be a frequent update.

Jeevananthan-23 commented 1 year ago

@Jeevananthan-23 Thanks for raising this! Consensus is useful for electing cluster manager nodes, but I don't think it's required on primary failure.

@mch2 Sorry, for the late reply had some research on existing ElasticSearch consensus solutions they also don't relay on Raft consensus.

2212 is around handling failover within a replication group with segment replication enabled. During the failover case today, the cluster manager node makes a decision on which replica should be elected as the new primary within the replication group here, by considering only if the candidate is active & selecting the furthest ahead in terms of OpenSearch version. With segment replication, we also want to take into account the candidate's latest SegmentInfos version. We want to do this to ensure that we are 1) not reindexing documents that have already been indexed and 2) to avoid creating new segments of the same name that already exist somewhere within the replication group.

As you mentioned here the new primary promotion must be accountable with latest SegmentInfos version.

what happen when master primary shard dies at first inside the cluster and newly promoted primary shard has same segment number and how promotion happens?

This is the desired case with segment replication. The newly promoted primary would have previously been syncing segments with the old primary, so it will have up to the old primary's latest segments at the time of failover. The new primary will continue indexing and create new segments that no other replica in the group has.

If the newly elected primary is behind the old primary but another replica in the replication group is up to date, this is where the conflict occurs. The newly elected primary will in this case replay form its translog after promotion & create new segments with the same name as existing segments on the other replica. #4365 was an attempt to prevent the newly elected primary from creating new segments with a name higher than that of a segment on a pre existing replica. However, this solution is not fool proof, we only bump the counter (which drives the segment name) by an arbitrary amount, so if the newly elected primary was behind the old by more than that amount, we could still see conflicts. If this happens, the newly elected primary will continue, yet the replicas would fail & need recovery.

My proposal here is that should consider translog for promotion at first to newly electe primary by using Raft.

Send a {n} (n is larger than the largest segment counter in current SegmentInfos) to master node before segment replication progress so that master can tell the newly promoted Primary Shard not to generate any segment less than {n}. To reduce the pressure of master node, we don't need to send this information every time. For example, if the max segment is _4.si in primary's current SegmentInfos, we can send _rw(or 1004 in decimal) to master node. After segment increases to _rw.si, we send _1jo(or 2004 in decimal) to master node.

This was a suggestion to store the former primary's state within cluster state, so that we increase the counter by a known amount instead of some arbitrary long.

IMO we should update this logic that executes on cluster managers to fetch the latest checkpoint from all candidate replicas, and select the one with the highest value, this would add some latency to fetch from each replica, but I can't imagine it being too expensive in exchange for correctness. Alternatively, we could store in cluster state after each replica updates to a new set of segments, so that cluster managers already have this state, yet this would be a frequent update.

How is the latest checkpoint fetch from the sequence-number based replication as you mentioned have some latency this point the right implementation of Raft for coordination should help.

I know that this may have difficulties to implement but should be looking forward to benchmarking #2583 results.

Thanks!

shwetathareja commented 1 year ago

IMO we should update this logic that executes on cluster managers to fetch the latest checkpoint from all candidate replicas, and select the one with the highest value, this would add some latency to fetch from each replica, but I can't imagine it being too expensive in exchange for correctness.

+1 to add the logic to fetch the latest checkpoint before promoting a replica to primary. Whenever you choose to implement it, a note on the code reference for RoutingNodes. That logic is executed when processing new cluster state which executes in single threaded executor for cluster state updates https://github.com/opensearch-project/OpenSearch/blob/7472aa94ae327ac893c42c538230e201757b679b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java#L527 so don't update that logic directly. Rather add a separate transport logic to first fetch this information then promote replica for segrep indices.

Alternatively, we could store in cluster state after each replica updates to a new set of segments, so that cluster managers already have this state, yet this would be a frequent update.

This might turn out to be expensive if segments are created every few secs, this could result in too many requests to ClusterManager and wouldn't be preferred. ClusterManager shouldn't be in Indexing direct path.