databendlabs / openraft

rust raft with improvements
Apache License 2.0
1.41k stars 159 forks source link

Feature: store heartbeat metric in RaftMetrics and RaftDataMetrics #1177

Closed SteveLauC closed 4 months ago

SteveLauC commented 4 months ago

Checklist

Will check the above stuff when this PR is no longer a draft:)


What does this PR do

This PR attempts to implement the metric discussed in this thread, adding the heartbeat information (time of the last replication or heartbeat) to RaftDataMetrics and RaftMetrics so that users can subscribe to it.

It is a draft PR as I want to ensure it looks basically correct, then I will try to add tests, code documents, and polish interfaces.

How it is implemented

For the time type, I used RaftTypeConfig::AsyncRuntime::Instant as it is used widely.

And I found the last applied log ID is stored in struct ProcessEntry, so I stored this time information there as well. When a response to a replication request is received, the replication handler will update it in ReplicationHandler::update_matching() using the sending_time of the replication result.

And these metrics will be sent to the tokio watch channel in RaftCore::report_metrics().

Unsolved problems/Questions

There are several unsolved problems with this PR, which I want to discuss with the maintainer before taking any further actions.

  1. The serde::{Deserialize, Serialize} bounds

    RaftDataMetris and RaftMetrics need to satisfy these bounds when the serde feature of Openraft is enabled, but neither std::time::Instant nor tokio::time::Instant does this.

  2. Should we use the result.sending_time, or we should let the follower pass the time when it receives the replication request? The later seems more accurate.

  3. Tests

    Looks like most unit tests (related to replication) use special human-made log ID to emulate specific cases, should this time info be done similarly? I guess no as it is not that important compared to the log ID.


This change is Reviewable

drmingdrmer commented 4 months ago

It looks like you can not use Instant in RaftMetrics which requires serde.

The metrics may have to store the interval since the last acked heartbeat, just like millis_since_quorum_ack does: https://github.com/datafuselabs/openraft/blob/9dbcd4c0b6723fd4ac5d38ed032d8e656d0abc8d/openraft/src/metrics/raft_metrics.rs#L70

SteveLauC commented 4 months ago

Thanks for the review and guiding me!


It looks like you can not use Instant in RaftMetrics which requires serde.

The metrics may have to store the interval since the last acked heartbeat, just like millis_since_quorum_ack does:

Done

You do not need to store the ack time. there is already a field for it:

It can be transform to a BTreeMap as it does with progress -> replications:

Done

SteveLauC commented 4 months ago

naming a member time and especially w/o documentation is very confusing.

Please enhance the documentation to clarify the meaning of the value

Yeah, I will handle them, just want to ensure this PR is basically ok before doing these things.

It is a draft PR as I want to ensure it looks basically correct, then I will try to add tests, code documents, and polish interfaces.

SteveLauC commented 4 months ago

Hi, I just added the code documents.


We need to create a test to verify the functionality of these features.

Please include the heartbeat tests in this module:

https://github.com/datafuselabs/openraft/blob/ca156ead4af0624e990d1e1e069aab929a5379a3/tests/tests/metrics/main.rs#L10

Should I add tests in this module or the module tests/tests/metrics/t30_leader_metrics.rs, looks that the later is for metrics and the replication tests are put in it:

$ rg "replication"
t30_leader_metrics.rs
52:                if let Some(ref q) = x.replication {
61:            "no replication with 1 node cluster",
95:                if let Some(ref q) = x.replication {
102:            "replication metrics to 4 nodes",
128:        "--- replication metrics should reflect the replication state"
137:                    if let Some(ref q) = x.replication {
144:                "replication metrics to 3 nodes",
159:        n1.wait(timeout()).metrics(|x| x.replication.is_some(), "node-1 starts replication").await?;
161:        n0.wait(timeout()).metrics(|x| x.replication.is_none(), "node-0 stopped replication").await?;

Or maybe it should be added in tests/tests/metrics/t10_server_metrics_and_data_metrics.rs?

SteveLauC commented 4 months ago

The Display implementations should be consistent: always put heartbeat after replication.

Sure

SteveLauC commented 4 months ago

https://github.com/datafuselabs/openraft/actions/runs/10005367869/job/27655934359?pr=1177

failures:

---- t11_add_learner::check_learner_after_leader_transferred stdout ----
panicked at /home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13:
assertion failed: Some(log_id) <= self.committed()
/home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13
backtrace is disabled without --features 'bt'
panicked at /home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13:
assertion failed: Some(log_id) <= self.committed()
/home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13
backtrace is disabled without --features 'bt'
panicked at /home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13:
assertion failed: Some(log_id) <= self.committed()
/home/runner/work/openraft/openraft/openraft/src/raft_state/log_state_reader.rs:25:13
backtrace is disabled without --features 'bt'
Error: timeout after 3s when node in new cluster finally commit at least one blank leader-initialize log .applied_index>=10 latest: Metrics{id:3, Leader, term:2, vote:<T2-N3:Q>, last_log:10, last_applied:T1-N0.9, leader:3(since_last_ack:None ms), membership:{log_id:T1-N0.9, {voters:[{1:(),3:(),4:()}], learners:[2:()]}}, snapshot:None, purged:None, replication:{1:None,2:None,3:T2-N3.10,4:None}, heartbeat:{1:None,2:None,3:76,4:None}}

This test failure seems irrelevant.

SteveLauC commented 4 months ago

Please rewrite allocation-free, e.g., by adding properly-implemented Display trait to HeartbeatMetrics.

This would make HeartbeatMetrics a wrapper type, not an alias since we cannot impl std::fmt::Display for std::collections::BTreeMap due to the orphan rule, I will proceed and implement it (for HeartbeatMetric and ReplicationMetrics) if you guys are ok with this change.

drmingdrmer commented 4 months ago

@schreter Shall we consider upgrading Duration in milliseconds to Instant in a separate PR to keep the changes in this PR more manageable?

And GPT-4o has given me a working example of serializing Instant 🤔 :

Below is a complete example that includes both serialization and deserialization for std::time::Instant using serde. We will create a newtype wrapper around Instant and implement both Serialize and Deserialize.

  1. Add dependencies: Ensure you have the necessary dependencies in your Cargo.toml:

    [dependencies]
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
  2. Create wrapper and implement serialization and deserialization:

    use serde::{Serialize, Deserialize, Serializer, Deserializer};
    use serde::de::{self, Visitor};
    use std::time::{Instant, SystemTime, UNIX_EPOCH, Duration};
    use std::fmt;
    
    // Newtype wrapper around `Instant`
    #[derive(Debug)]
    struct SerializableInstant(Instant);
    
    impl Serialize for SerializableInstant {
       fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
       where
           S: Serializer,
       {
           // Convert `Instant` to `SystemTime`
           let system_time = SystemTime::now() + (self.0 - Instant::now());
           // Get the duration since the Unix epoch
           let duration = system_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
           // Serialize the duration as seconds since the epoch
           serializer.serialize_u64(duration.as_secs())
       }
    }
    
    impl<'de> Deserialize<'de> for SerializableInstant {
       fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
       where
           D: Deserializer<'de>,
       {
           struct InstantVisitor;
    
           impl<'de> Visitor<'de> for InstantVisitor {
               type Value = SerializableInstant;
    
               fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
                   formatter.write_str("a u64 representing seconds since the Unix epoch")
               }
    
               fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
               where
                   E: de::Error,
               {
                   let system_time = UNIX_EPOCH + Duration::new(value, 0);
                   let now = SystemTime::now();
                   let instant = if system_time > now {
                       Instant::now() + (system_time.duration_since(now).unwrap())
                   } else {
                       Instant::now() - (now.duration_since(system_time).unwrap())
                   };
                   Ok(SerializableInstant(instant))
               }
           }
    
           deserializer.deserialize_u64(InstantVisitor)
       }
    }
    
    // Example usage
    fn main() {
       let now = Instant::now();
       let serializable_now = SerializableInstant(now);
    
       // Serialize to JSON
       let json = serde_json::to_string(&serializable_now).unwrap();
       println!("Serialized instant: {}", json);
    
       // Deserialize from JSON
       let deserialized_instant: SerializableInstant = serde_json::from_str(&json).unwrap();
       println!("Deserialized instant: {:?}", deserialized_instant);
    }

Explanation

  1. Serialization:

    • We convert Instant to SystemTime by calculating the difference between self.0 (the stored Instant) and Instant::now(), and then adding it to SystemTime::now().
    • We then get the duration since the Unix epoch and serialize it as a u64 representing seconds since the epoch.
  2. Deserialization:

    • We define a Visitor to handle the deserialization of a u64 value.
    • We convert the u64 back to a SystemTime by adding the duration to the Unix epoch.
    • We then convert SystemTime back to Instant by adjusting from the current SystemTime to the given SystemTime.

This ensures that we can both serialize an Instant to a JSON representation and deserialize it back to an Instant. The Instant is stored as the number of seconds since the Unix epoch, which is a common and portable format for timestamps.

SteveLauC commented 4 months ago

Shall we consider upgrading Duration in milliseconds to Instant in a separate PR to keep the changes in this PR more manageable?

And I am actually not sure if that allocation refactor should be done in this PR, it deserves a separate PR if we want a PR to only does one thing 🤪

SteveLauC commented 4 months ago

I just finished that display refactor:)

SteveLauC commented 4 months ago

Given that DisplayBTreeMapOptValue is already available, you can simplify the expression like this

I wrote it in that way so that there won't be a allow unused attribute in display_btreemap_opt_value.rs, only one in display_ext.rs, to be consistent with other display_xxx.rs files, I agree it's kinda weird, can change to approach you suggested if you don't mind another unused attribute.

drmingdrmer commented 4 months ago

Given that DisplayBTreeMapOptValue is already available, you can simplify the expression like this

I wrote it in that way so that there won't be a allow unused attribute in display_btreemap_opt_value.rs, only one in display_ext.rs, to be consistent with other display_xxx.rs files, I agree it's kinda weird, can change to approach you suggested if you don't mind another unused attribute.

I do not quite mind about the unused import :)

SteveLauC commented 4 months ago

This snippet could be simplified to:

Done


I do not quite mind about the unused import :)

Done


I don't quite get it. Why are these imports unused? Are they just written for future use and not really used in the crate (yet)?

I am not sure about this as well, I just follow the convention:


Previously, drmingdrmer (张炎泼) wrote…

Or even further to:

They are basically same, so I just use the one suggested by @drmingdrmer.

SteveLauC commented 4 months ago

When you guys this PR is in its last review round, ping me so that I will squash my commits:)

SteveLauC commented 4 months ago

Gentle ping on @schreter, can I trouble you for another review?

drmingdrmer commented 4 months ago

Gentle ping on @schreter, can I trouble you for another review?

It's been good enough to me. Please squash the commits and let's merge it.

@schreter sometimes got delays for giving feedback. Do not worry. If any refinements are needed, he will provide feedback later soon, and you can update it in the next PR. This has become a routine working pattern:)

SteveLauC commented 4 months ago

Rust nightly 1.82 renamed std::panic::PanicInfo to std::panic::PanicHookInfo and mark the old name deprecated, this change is currently exclusive to nightly toolchain 1.82.0 and higher.

Looks like we have to use something like rustversion to handle it? 🤔 Or we can #[allow(deprecated)] it.

drmingdrmer commented 4 months ago

Rust nightly 1.82 renamed std::panic::PanicInfo to std::panic::PanicHookInfo and mark the old name deprecated, this change is currently exclusive to nightly toolchain 1.82.0 and higher.

Looks like we have to use something like rustversion to handle it? 🤔 Or we can #[allow(deprecated)] it.

It looks like a single #[allow(deprecated)] would be good enough.

SteveLauC commented 4 months ago

It looks like a single #[allow(deprecated)] would be good enough.

Let me do this in a separate PR.

drmingdrmer commented 4 months ago

It looks like a single #[allow(deprecated)] would be good enough.

Let me do this in a separate PR.

I'm gonna include this fixup in this:

SteveLauC commented 4 months ago

I'm gonna include this fixup in this:

Feel free to close #1188 if you need to 😁

SteveLauC commented 4 months ago

I actually already did, but since I didn't add any additional comments you probably didn't get any mail. I just removed the blocker/marked the Display formatting as "done" in Reviewable.

Ahh, sorry for missing that, looks like I really need to learn how to use Reviewable. 😶‍🌫️