paradigmxyz / reth

Modular, contributor-friendly and blazing-fast implementation of the Ethereum protocol, in Rust
https://reth.rs/
Apache License 2.0
3.86k stars 1.11k forks source link

Create a downloader component separate from the stages #764

Closed onbjerg closed 1 year ago

onbjerg commented 1 year ago

Describe the feature

Currently we have two downloaders: a concurrent one for bodies and a linear one for headers.

Ideally, both downloaders would be concurrent. They have some shared logic:

Both of the downloaders also have retry logic and peer penalization.

Both downloaders also have their own issues:

These issues lead to not saturating the network properly: each downloader, at some point, is not requesting as much data as it could, for various reasons. This leads to slow sync times for online stages.

To address these issues, the plan is now to create a downloader component that lives outside of the stages, with a channel to communicate with the stages.

High level plan

  1. Create a new downloader component that downloads batches of headers and bodies optimistically
  2. The stages should not build requests for bodies or headers; they just ask the downloader for the next header or body

Point 2) also means that the downloader component must satisfy these invariants:

  1. Headers and bodies returned from the downloader component must sequentially connect and be valid according to the pre-validation rules we use now
  2. The downloader must pre-emptively download as many headers or bodies as it can given the information it has (within some limit)

Flow

  1. The downloader is instantiated alongside the pipeline
  2. The downloader is given the current fork choice state (the tip) and our current local head (if any)
  3. The downloader moves into an idle state
  4. On the first request for a header or body, it will start downloading
    1. If the request was a header, it will start downloading all the headers it can
    2. If the request was a body, it will start downloading all the bodies it can
  5. The stage that requested a header or body will either:
    1. Get the data
    2. Be told that we reached the tip (i.e. there is no more data for now)
    3. Be told that we've re-orged (see reorg section)
    4. Be notified that some super fatal error occured (see error handling section)
  6. If data was received in the previous step, that piece of data is removed from the internal buffer of the downloader
    1. Data is assumed by the stages to be in order and valid, so the stage should just insert the data into the transaction. If no more data is available, the stage should commit whatever it has written to the transaction already.

The downloader MUST not download headers and bodies at the same time, see the concurrency section.

It is up to the stages to control how much data to request, i.e. it is OK for stages to only ask for e.g. 1000 headers and commit.

Concurrency

The bodies are already downloaded concurrently and the headers will follow the same general idea:

For a range of block a to b, slice the range into n batches and request each batch from idle peers in the set, at most c at a time. Note that the batch size for headers and bodies should be different because of the fundamental constraint for request size being the message size limit of devp2p.

For headers to be downloaded concurrently, we have to request headers by block number instead of block hash.

There was some thought about downloading headers and bodies optimistically at the same time (i.e. as soon as a header has been downloaded, download the corresponding body), however this does not make a lot of sense for two reasons:

Validation

For headers:

For bodies:

Note that the validation for bodies can be expensive

Configuration

The following parameters at least must be configurable:

  1. The size of the internal buffer for headers (i.e. how many headers to have in memory at most at any time)
  2. The size of the internal buffer for bodies (i.e. how many bodies to have in memory at most at any time)
  3. The concurrency of the downloader (i.e. the maximum number of requests to have in-flight at any time)
    1. This could technically be adjusted dynamically with info from the networking component (more peers -> higher concurrency)
  4. The batch size of the requests for headers (i.e. how many headers to at most ask for from a peer per request)
  5. The batch size of the requests for bodies (i.e. how many bodies to at most ask for from a peer per request)

Client vs downloader

The downloader sends requests to a client. For most of sync, this client will be the FetchClient in the networking component, which forwards requests to peers.

The reason the client is separated from the downloader itself is so it can be swapped; this is particularly important for post-merge syncing where syncing from a consensus client via the engine API is more appropriate - it is still possible to ask execution client peers for block data, but this will lead to us always being a bit behind.

Post-merge download

The consensus client will post new blocks to us via the Engine API. These new blocks (called payloads) should be kept in-memory by some component (TBD) until the consensus client sends a new forkchoice state. When the forkchoice state is sent, we need to figure out what payloads are now on the canonical chain and which we can discard. The new canonical blocks are kept in a buffer for the downloader to pick from later.

I propose that this buffering mechanism between EL/CL is kept in a engine API-specific downloader client. At some condition, we switch between the P2P client and this engine API client. This switching logic can be handled in a client that wraps both the P2P client and the engine API client.

Error handling

The downloader should internally handle timeouts, invalid responses, and retries. These should not propagate to the stages themselves.

The only errors that should propagate to the stages themselves are fatal errors from which the downloader can never recover. This will mean that the stage will block until it gets more data, however, this is fine since the pipeline cannot meaningfully progress without the requested data.

Re-orgs

The downloader should keep track of whether a re-org occurred or not and communicate this to the stages. For beacon consensus this would occur when a fork choice state is sent from the consensus layer to us and the new tip does not connect directly onto our local head.

The downloader should emit an event upon request of data containing:

Upon receiving this event online stages must unwind to the latest block we have that connects to the new tip in order to discard any data that is no longer on the canonical chain.

In my opinion this can be left for later when we have the engine API and are able to meaningfully test it

Additional context

Supercedes https://github.com/paradigmxyz/reth/issues/744, https://github.com/paradigmxyz/reth/issues/741 and https://github.com/paradigmxyz/reth/issues/391

mattsse commented 1 year ago

Create a new downloader component that downloads batches of headers and bodies optimistically

we're already doing this for bodies, right?

Can you elaborate on separate?

At this point I'm thinking we can achieve all that just by converting the downloader into a Stream type

onbjerg commented 1 year ago

we're already doing this for bodies, right?

No, they are not downloaded optimistically. The bodies stage asks for a range of bodies and then the current downloader will download only that range. If we are blocked on e.g. the first body, then we are spending a lot of time where we are not downloading anything. The main idea here is that the downloader will try to download as much as possible on its own, not only when the stages ask for a range of data.

Can you elaborate on separate?

In reference to what specifically?

mattsse commented 1 year ago

If we are blocked on e.g. the first body, then we are spending a lot of time where we are not downloading anything.

blocked by what exactly? Do you refer to the execute function of the BodiesStage? which only tries to download a fixed batch per invocation which takes as long as the longest response + valdition (worst case first request arrives last), hence preventing follow-up downloads. This could be fixed by turning the bodies downloader into a stream itself that automatically sends new requests once a response arrived.

In reference to what specifically?

separate to what: do we want to extract them out of the stages (Headers,Bodies)?

gakonst commented 1 year ago

This could be fixed by turning the bodies downloader into a stream itself that automatically sends new requests once a response arrived.

Hmm it sounds plausible that this may be addressable this way without a larger refactor? Do we still want to do the refactor as it should improve headers download speed as well?

onbjerg commented 1 year ago

Hmm it sounds plausible that this may be addressable this way without a larger refactor? Do we still want to do the refactor as it should improve headers download speed as well?

I would want the refactor to be done, it makes sense to merge the downloaders into one since they are so similar. It also makes sense to lookahead and download as much as possible.

If we just turn the bodies downloader into a stream we would likely still have the same issues I think? In the sense that the stage will request what it needs, no more. If we want it to request more than it needs, then there is a lot of additional logic needed in the stage and it ends up not really benefitting us in that sense

separate to what: do we want to extract them out of the stages (Headers,Bodies)?

I'm still not entirely sure what you mean. If you are referencing this line:

"The reason the client is separated from the downloader itself is so it can be swapped; "

Then what I mean is that the downloader component itself takes some trait Client (like the current downloaders do) so we can swap out the current FetchClient, that talks to P2P, to an EngineClient that talks to CL.

blocked by what exactly? Do you refer to the execute function of the BodiesStage?

Slow peers.

This could be fixed by turning the bodies downloader into a stream itself that automatically sends new requests once a response arrived.

See above: we request a range of blocks 0..1000 divided into, say, 10 batches. The bodies stage does not request more than it needs, and I don't think it should even if it could; it just seems a bit leaky?

If the last 9 batches arrive, and the first batch is blocked on a slow peer, then naturally we should repurpose the fast peers to download all the other batches. If the downloader component was separate, it could:

  1. Download up to buffer_size bodies concurrently from as many peers as possible
  2. If a batch is slow, no problem, we just keep requesting other batches from other peers until the buffer is filled.

In the current case, each time the bodies stage is executed, we will probably end up talking to the slow peer, which means that if the peer takes 20 seconds to respond, that's 20 seconds wasted for every commit_threshold number of bodies. In the case described above, this would be minimized since the other good peers would get more requests, so the total amount of wasted time "per batch" is lowered.

Hopefully that makes sense?

mattsse commented 1 year ago

If we just turn the bodies downloader into a stream we would likely still have the same issues I think?

in Stage::execute we know the endblock right? this is what we should use for the downloader, so even if the stage only fetches commit_threshold blocks the downloader can still send a followup request for every response it yields to the stage

onbjerg commented 1 year ago

Ok I think I understand now, basically we would have a long-lived stream with an internal buffer and e.g. the bodies stage would:

  1. Hint what the latest header is
  2. Hint what block we downloaded last
  3. Poll the stream up to a max of commit_threshold times

And the stream would:

  1. On each poll, if the internal buffer is less than the max size, request bodies concurrently until the buffer is full or we downloaded up to tip
  2. Of course, once a body is returned, it is removed from the buffer

(and all the other stuff like validation, making sure bodies are returned sequentially etc)

gakonst commented 1 year ago

Closed, we now spawn our downloader tasks #1055 #958