xline-kv / Xline

A geo-distributed KV store for metadata management
Apache License 2.0
562 stars 70 forks source link

[Improvement] Return early in fast_round when the amount of `KeyConflict` large than or equal to recover quorum #825

Closed Phoenix500526 closed 3 weeks ago

Phoenix500526 commented 1 month ago

Currently, the fast_round in crates/curp/src/client/unary.rs looks like

/// Send proposal to all servers
pub(super) async fn fast_round(
    propose_id: ProposeId,
    cmd: &C,
    token: Option<&String>,
) -> Result<Result<C::ER, C::Error>, CurpError> {
    let req = ProposeRequest::new(propose_id, cmd, self.state.cluster_version().await);
    let timeout = self.config.propose_timeout;

    let mut responses = self
        .for_each_server(|conn| {
            let req_c = req.clone();
            let token_c = token.cloned();
            async move { (conn.id(), conn.propose(req_c, token_c, timeout).await) }
    let super_quorum = super_quorum(responses.len());

    let mut err: Option<CurpError> = None;
    let mut execute_result: Option<C::ER> = None;
    let mut ok_cnt = 0;

    while let Some((id, resp)) = responses.next().await {
        let resp = match resp {
            Ok(resp) => resp.into_inner(),
            Err(e) => {
                warn!("propose cmd({propose_id}) to server({id}) error: {e:?}");
                if e.should_abort_fast_round() {
                    return Err(e);
                if let Some(old_err) = err.as_ref() {
                    if old_err.priority() <= e.priority() {
                        err = Some(e);
                } else {
                    err = Some(e);
        let deserialize_res = resp.map_result::<C, _, Result<(), C::Error>>(|res| {
            let er = match res {
                Ok(er) => er,
                Err(cmd_err) => return Err(cmd_err),
            if let Some(er) = er {
                assert!(execute_result.is_none(), "should not set exe result twice");
                execute_result = Some(er);
        let dr = match deserialize_res {
            Ok(dr) => dr,
            Err(ser_err) => {
                warn!("serialize error: {ser_err}");
                // We blame this error to the server, although it may be a local error.
                // We need to retry as same as a server error.
                err = Some(CurpError::from(ser_err));
        if let Err(cmd_err) = dr {
            // got a command execution error early, abort the next requests and return the cmd error
            return Ok(Err(cmd_err));
        // if the propose meets the super quorum and we got the execute result,
        // that means we can safely abort the next requests
        if ok_cnt >= super_quorum {
            if let Some(er) = execute_result {
                debug!("fast round for cmd({}) succeed", propose_id);
                return Ok(Ok(er));

    if let Some(err) = err {
        return Err(err);

    // We will at least send the request to the leader if no `WrongClusterVersion` returned.
    // If no errors occur, the leader should return the ER
    // If it is because the super quorum has not been reached, an error will definitely occur.
    // Otherwise, there is no leader in the cluster state currently, return wrong cluster version
    // and attempt to retrieve the cluster state again.

As you can see, all the KeyConflict errors are ignored. We can count the KeyConflict errors. If the number of KeyConflict errors is equal to or larger than the recover_quorum, which is about a quarter of the number of nodes in a cluster, we can return early from the fast round since the superquorum condition will never be met.

github-actions[bot] commented 1 month ago

👋 Thanks for opening this issue!

Reply with the following command on its own line to get help or engage:

realtaobo commented 3 weeks ago
