paradigmxyz / reth

Modular, contributor-friendly and blazing-fast implementation of the Ethereum protocol, in Rust
https://reth.rs/
Apache License 2.0
3.81k stars 1.07k forks source link

Disconnect slowest peers #908

Open mattsse opened 1 year ago

mattsse commented 1 year ago

Describe the feature

with #891 we aim to keep all peers busy with requests during sync.

Slow peers will be a bottleneck, and should get auto-disconnected if

  1. x amount of peers are active
  2. the slowest peer's timeout is larger than the fastest peers by some metric.

TODO

Additional context

No response

akirillo commented 1 year ago

Synthesizing some initial thoughts:


Factors to consider for dropping / ranking

  1. Latency (we want to minimize this)
  2. Throughput (we want to maximize this)

How to measure factors

  1. Latency
    • We have an indication of this with peer.timeout, but I believe we should use this only for actually timing out a peer and use a separate measurement that we use for ranking (which peer to send the next request to) and for dropping.
    • peer.timeout only ever increases, and may get increased when the peer in question is dealing with a burst of traffic and times out once, but is forever penalized.
    • I believe a better metric would be a moving average of round trip time over the last $n$ requests
  2. Throughput
    • With #783 we'll be measuring per-stream bandwidth, we need to expose this as a rate (over some interval $p$) to get a measure of throughput

How to use factors for dropping

I'd propose a multi-tiered system for both latency & throughput. Let me introduce some parameters:

The logic for dropping a peer is as follows:

if $a$ < $x$:
    // we don't have many active peers so we're eager to keep more peers around,
    // only drop if they have high-severity latency/throughput

    if `peer.latency` >= $l_h$: drop peer

    if `peer.throughput` <= $t_h$: drop peer

else:
    // we have many active peers, are more aggressive about dropping bad ones,
    // drop if they have low-severity latency/throughput

    if `peer.latency` >= $l_l$: drop peer

    if `peer.throughput` <= $t_l$: drop

The rationale for this is that we may want to drop a bad peer even if we have < $x$ active peers. Wasting time waiting around for a truly bad peer to time out when we could be sending the same request to some new peer that we just discovered is sub-optimal.

For the sake of simplicity, though, we could start by only dropping peers when $a$ >= $x$ and only use the "low-severity" thresholds

How to use factors for ranking

Currently, we use peer.timeout to sort peers and determine which should handle the next request (goes to the lowest-timeout idle peer).

For the reasons mentioned in the "How to measure factors" section, I believe we shouldn't use peer.timeout for this ranking, and should instead use the same measurement that we use for dropping (moving average of round trip time).

However, we now have two factors in play: latency and throughput. How should we rank the peers along these two axes?

This is something I would love input on, but broadly I get the sense that throughput is more important than latency.

Other considerations

Dropped peer blacklist

We should maintain a blacklist of peers that we have previously dropped so that if we re-discover them, we don't try to connect again.

We could even consider persisting this blacklist when the node shuts down.

This list should be implemented as a ring buffer of fixed size so that:

  1. It doesn't open up a DoS vector
  2. It gives dropped peers an intermittent chance to be re-discovered if they're overwritten in the ring. Since their performance (latency, throughput) is variable and may improve we may not want to blacklist them forever.

We could even play around with fancier structures like an LRU cache for this (haven't thought much about it)

mattsse commented 1 year ago

love this overview.

out a peer and use a separate measurement that we use for ranking (which peer to send the next request to) and for dropping.

agree, we need a separate value here.

we don't have many active peers so we're eager to keep more peers around

right, this should also depend on the number of connected peers.

to get a measure of throughput

we need to take into account that header requests are generally cheaper than blocks, fewer data to read from DB

Dropped peer blacklist

we have this in PeersManager which supports banning indefinitely or until a certain timestamp

akirillo commented 1 year ago

agree, we need a separate value here.

Yeah, I think we should use the latency & throughput measurements defined above, but as far using them to sort peers I'm still not sure on how to combine the two metrics, very open to suggestions here,

right, this should also depend on the number of connected peers.

Mm, good call - will look into the difference between "active" and "connected," but my reflex is that just using the number of connected peers is the right behavior. Since we're already connected with them, we will at some point try to communicate with them on a subprotocol, and then we'll be able to evaluate if they're slow/not and if they should be dropped.

we need to take into account that header requests are generally cheaper than blocks, fewer data to read from DB

Hmm, good point, but the pipeline structure may make this simpler for us. The throughput thresholds are relative to the current best peer, and at any given point, all the peers we're communicating with are all handling the same sorts of requests - i.e. either we're requesting headers from all peers, or we're requesting bodies from all peers, but not one from some and the other from the rest. So they'll all always be handling the same sort of request, and be within the same "throughput regime" if that makes sense.

Unless... this breaks down with the new downloader component. Will we be requesting headers and bodies from peers simultaneously?

we have this in PeersManager which supports banning indefinitely or until a certain timestamp

Ahh ok sick, my reflex here is to ban indefinitely if it was a "high-severity" disconnect and ban for some constant amount of time if it was a "low-severity" disconnect

mattsse commented 1 year ago

between "active" and "connected,"

this is actually the same, should unify naming here or at least properly document.

I chose connected for the API functions because from the outside you don't care about active vs pending connection.

of time if it was a "low-severity" disconnect

we also have this in the form of BackoffKind::Low,Medium,High from which we derive the temp ban duration:

https://github.com/paradigmxyz/reth/blob/eb11da8adffc522e07490da89d9a295410dc1436/crates/net/network/src/error.rs#L43-L50

akirillo commented 1 year ago

Marking as blocked by #992

akirillo commented 1 year ago

Have looked into the code a bit and specced this out a bit further:


Consolidation into Peer.reputation

I think that all decisions regarding whether a peer should be banned temporarily, banned from discovery, and/or disconnected, as well as the order in which we rotate through peers, should be based on it's reputation score. This is a clean abstraction and represents a central location in the code where all such logic is implemented. The 2-tier logic described in the previous comment can still be applied, and will dictate how much to change the peer's reputation by.

When / where to disconnect slow peers

Currently, all other locations in the code where we ban/disconnect a peer is in response to a concrete event or error, e.g. when a pending session is dropped due to a fatal protocol error, or on an invalid message from a peer, etc. IMO it doesn't make sense to be as reactive when it comes to disconnecting slow peers - "becoming slow" is not an event to respond to, so I don't see a particularly appropriate place in the code where we could hook in and say "now, iterate over the peers and drop the slow ones."

I think polling an interval within PeersManager.poll is the right abstraction here, similar to how we poll an interval to unban peers.

Necessary state

The most important new state necessary for this functionality should be able to fit within the Peer and PeersManager structs.

To the PeersManager struct, we add a single field:

struct PeersManager {
    // snip
    performance_evaluation_interval: Interval,
    // snip
}

Where performance_evaluation_interval is the Interval on which we evaluate performance (some aggregate measure of latency / throughput) for each peer.

To the Peer struct, we add the following:

struct Peer {
    // snip
    metered_stream_counts: MeteredStreamCounts,
    ingress_throughput_window: [f64; THROUGHPUT_WINDOW_SIZE],
    egress_throughput_window: [f64; THROUGHPUT_WINDOW_SIZE],
    latency_window: [Duration; LATENCY_WINDOW_SIZE],
    // snip
}

Where:

FWIW, we can use a VecDeque for the windows, the API is a bit more ergonomic but we could be using a fixed-size window and reap the memory benefits.

Additionally, we'll need to make changes to ReputationChangeKind and ReputationChangeWeights to reflect these new reasons (throughput, latency) for changing a peer's reputation:

pub enum ReputationChangeKind {
    // snip
    HighLatency,
    SeverelyHighLatency,
    LowThroughput,
    SeverelyLowThroughput,
    // snip
}

pub struct ReputationChangeWeights {
    // snip
    high_latency: Reputation,
    severely_high_latency: Reputation,
    low_throughput: Reputation,
    severely_low_throughput: Reputation,
    // snip
}

What the exact weights are for these new ReputationChangeKinds still need design - but broadly, I think throughput should have more weight than latency.

Finally, we need a quick enum to indicate whether we want to drop peers with high- or low-severity latency/throughput:

enum SeverityThreshold {
    Low,
    High,
}

Necessary logic

The core entry point into this functionality would be in the PeersManager.poll method, where we calculate aggregate values for each peer's throughput and latency. Of course, the code below should be taken with a grain of salt (and abstraction!), but it should convey the gist:

pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
    // snip
    if self.performance_evaluation_interval.poll_tick(cx).is_ready() {

        let total_connected_peers = self.connection_info.num_outbound + self.connection_info.num_inbound;
        let max_connected_peers = self.connection_info.max_outbound + self.connection_info.max_inbound;
        let connection_utilization = total_connected_peers / max_connected_peers;
        let severity_threshold = if connection_utilization < DESIRED_CONNECTION_UTILIZATION { SeverityThreshold::High } else { SeverityThreshold::Low };

                let ingress_throughput_threshold;
                let egress_throughput_threshold;
                let latency_threshold;
                let throughput_rep_change_kind;
                let latency_rep_change_kind;

                match severity_threshold {
                        SeverityThreshold::Low => {
                                 ingress_throughput_threshold = LOW_SEVERITY_INGRESS_THROUGHPUT_THRESHOLD;
                                 egress_throughput_threshold = LOW_SEVERITY_EGRESS_THROUGHPUT_THRESHOLD;
                                 latency_threshold = LOW_SEVERITY_LATENCY_THRESHOLD;
                                 throughput_rep_change_kind = ReputationChangeKind::LowThroughput;
                                 latency_rep_change_kind = ReputationChangeKind::HighLatency;
                        }
                        SeverityThreshold::High => {
                                 ingress_throughput_threshold = HIGH_SEVERITY_INGRESS_THROUGHPUT_THRESHOLD;
                                 egress_throughput_threshold = HIGH_SEVERITY_EGRESS_THROUGHPUT_THRESHOLD;
                                 latency_threshold = HIGH_SEVERITY_LATENCY_THRESHOLD;
                                 throughput_rep_change_kind = ReputationChangeKind::SeverelyLowThroughput;
                                 latency_rep_change_kind = ReputationChangeKind::SeverelyHighLatency;
                        }
                };

        for (peer_id, peer) in self.peers.iter() {
            // Update throughput windows (assuming VecDeque API)
            peer.ingress_throughput_window.pop_front();
            peer.ingress_throughput_window.push_back(peer.metered_stream_counts.ingress_throughput.load(Ordering::Relaxed);

            peer.egress_throughput_window.pop_front();
            peer.egress_throughput_window.push_back(peer.metered_stream_counts.egress_throughput.load(Ordering::Relaxed);

            // Evaluate aggregate throughput measures
            let ingress_throughput_aggregate = aggregate_throughput(peer.ingress_throughput_window);
            let egress_throughput_aggregate = aggregate_throughput(peer.egress_throughput_window);

            // Latency window will get updated in `update_request_timeout` (?) (TBD)

            // Evaluate aggregate latency measure
            let latency_aggregate = aggregate_latency(peer.latency_window);

            // Apply reputation changes
            if ingress_throughput_aggregate < ingress_throughput_threshold || egress_throughput_aggregate < egress_throughput_threshold {
                self.apply_reputation_change(peer_id, throughput_rep_change_kind);
            }

            if latency_aggregate > LATENCY_THRESHOLD {
                self.apply_reputation_change(peer_id, latency_rep_change_kind);
            }
        }

    }
    // snip
}

We may also need to hook in a way to add a backoff to the ban depending on severity, but perhaps this should be part of a more general refactor to apply_reputation_change - not sure, need to think about it.

akirillo commented 1 year ago

Per discussion in #1051, throughput/latency should be evaluated w/in the context of the data requested.

E.g., if some peer has been serving us full transactions, their latency will look abysmal compared to some peer that has been serving us transaction hashes.

The current plan was to just compare throughput/latency to that of the best recorded values across all peers, but now I think it may be better to track best throughput/latency per request type.

Lmk if this is overkill, it requires a lot more state per-peer. Maybe the lower latency in data-heavy requests will be offset by higher throughput, and we already have all the state we need to balance good peers serving small requests and good peers serving large requests

mempirate commented 1 year ago

100% agree on having different trackers per request type. By definition requesting a transaction by hash is going to be faster than requesting a block body. It's how geth does it as well in https://github.dev/ethereum/go-ethereum/blob/55f41d198c133d672cdba258cdca40a9cb23005c/p2p/msgrate/msgrate.go

However, they only use this tracking system during sync, and not after. They also do not penalize peers for any timeouts, but they do keep track of who to best send a request to according to the trackers. To see this in action, check out https://github.dev/ethereum/go-ethereum/blob/master/eth/downloader/fetchers_concurrent.go