paradigmxyz / reth

Modular, contributor-friendly and blazing-fast implementation of the Ethereum protocol, in Rust
https://reth.rs/
Apache License 2.0
3.81k stars 1.07k forks source link

Tracking: P2P #64

Closed Rjected closed 1 year ago

Rjected commented 1 year ago

P2P Networking proposal and tracking

RLPx Peer Connection

The RLPx peer connection should implement framed encryption as specified by RLPx.

Client

The ECIES transport should implement AsyncRead and AsyncWrite, so the p2p connection can use it. Long term, it would be nice to interact with framed encryption like this (similar to TLS libraries):

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
let rlpx = RLPxConnector::new(secret_key, peer_id)?;
let mut client: ECIESStream = rlpx.connect(tcp_conn).await?;

client.write_all(b"hello").await?;

let mut buffer = [0; 16];
let n = client.read(&mut buffer[..]).await?;

High priority tasks:

Lower priority:

Server

The RLPx Server should also work with any transport that implements AsyncRead and AsyncWrite. Longer term, it should be possible to serve RLPx like this:

let acceptor = RLPxAcceptor::new(secret_key);
let listener = TcpListener::bind(&addr).await?;

// roughly based off of the design of tokio::net::TcpListener
loop {
    let (client: ECIESStream, remote_addr) = acceptor.accept(listener).await?;
    process_socket(client).await;
}

Low priority tasks:

p2p Peer Connection

The RLPx peer connection will contain a server and client portion, and will take care of capability negotiation (the p2p capability), pings, and disconnects. Both the client and server should be capable of working with a type that implements AsyncRead and AsyncWrite, meaning the transport does not need to implement framed encryption. This makes it slightly easier to test, since it would not require creating an ECIESStream.

Client

We should be able to do something roughly similar to the above designs, but we need an extra task to handle driving the p2p state. This is roughly inspired by the design of hyper's client::conn.

// it would be nice to have utilities to properly generate a `Hello` message
let hello = Hello { ... };

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
let p2p_config = P2PConfig::new(hello);
let (mut client: P2PStream, p2p_state) = p2p_config.handshake(tcp_conn).await?;

tokio::spawn(async move {
    if let Err(e) = p2p_state.await {
        println!("Error! maybe ping error, etc: {}", e);
    }
});

// just an example, real messages are likely to be much more concrete
let message: Message = MessageBuilder::new()
    .with_hello(hello)
    .request_id(0xf)
    .message("hello");

// todo: may want to consider a message size limit on read
// ECIESStream can implement AsyncRead/AsyncWrite, but `p2p` changes the interface from working mainly with bufs to also including a message id
// luckily the stream after being decrypted should have a header that tells us the length
client.send(message).await?;
let peer_message = client.read().await?;

Tasks:

Server

This is roughly inspired by the design of hyper's server::conn.

let mut listener = TcpListener::bind(&addr).await?;
let server = P2PServer::new(hello);
loop {
    let (stream, remote_addr) = tcp_listener.accept().await?
    let mut client: P2PStream = server.accept(stream).await
    process_message(client).await;
}

Tasks:

eth Peer Connection

This contains a client and server, just like the RLPx and p2p connection. Instead of a p2p handshake, both a p2p and eth handshake are performed.

Client

// would also be nice to have a sensible way to generate `Status` messages
let status = Status { ... };

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
// this should create a `P2PConfig` under the hood
let eth_config = EthConfig::new(hello, status);
let (mut client: EthStream, p2p_state) = eth_config.handshake(tcp_conn).await?;

tokio::spawn(async move {
    if let Err(e) = p2p_state.await {
        println!("Error! maybe ping error, maybe disconnect, etc: {}", e);
    }
});

// since we are in a subprotocol now we can create a more concrete message
let get_pooled_transactions: GetPooledTransactions = vec!["0x7cab7b2c72d1c6cc8539ee5e4b8af9b86a130d63b1428c2c52c4454ead266ff4"].into();

// TODO: should the client impl just take care of choosing request ids and abstract out multiplexing?
let pooled_transactions = client.send_request(get_pooled_transactions).await?;
let mut hashes = client.stream_hashes().await?;
// if there were another client its stream could be joined
while let Some(hash) = hashes.next().await {
    // ...
}

Tasks:

Server

The eth server will wait for incoming connections and stream incoming

let mut listener = TcpListener::bind(&addr).await?;
// this call should make sure that the `Hello` message includes `eth` as a capability
let server = EthServer::new(hello, status);
loop {
    let (stream, remote_addr) = tcp_listener.accept().await?
    // this should create a `P2PServer` under the hood
    let mut client: EthStream = server.accept(stream).await
    process_message(client).await;
}

Tasks:

eth Wire Protocol

NOTE See: https://github.com/rjected/ethp2p

We need to provide RLP encoding and decoding for each of the eth protocol messages and each of the types they contain.

Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

eth Network abstraction

Finally, the Network abstraction will integrate all of the above components to provide a reliable connection to a set of peers on the network. This will implement interfaces that other components (e.g. staged sync, txpool) will use.

The network abstraction will also integrate with discovery mechanisms and manage the discovery state. This will act as a Server (an eth server) if the user is able to accept incoming connections and will use discovery methods to create outgoing connections, which will yield a set of EthStreams.

Current questions

How will we implement the Engine API? It prompts the execution layer to request blocks, which happens over eth p2p. It could also just use the API provided by the Network abstraction.

gakonst commented 1 year ago

Long term, it would be nice to interact with framed encryption like this (similar to TLS libraries):

Love the API proposed that uses AsyncRead/Write.

Longer term, it should be possible to serve RLPx like this:

Same Q as above re: server refactor, if we do it from scratch, do you think we could build directly for such an API?

This makes it slightly easier to test, since it would not require creating an ECIESStream. Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

Great!

// it would be nice to have utilities to properly generate a Hello message

Do you mean like builder pattern? We can either do it manually, or for simple structs use something like https://docs.rs/derive_builder/latest/derive_builder/. Same for your Status comment later.

// todo: may want to consider a message size limit on read // ECIESStream can implement AsyncRead/AsyncWrite, but p2p changes the interface from working mainly with bufs to also including a message id // luckily the stream after being decrypted should have a header that tells us the length

Didn't exactly understand what you mean here. cc @mattsse who's had thoughts on p2p.

let p2p_config = P2PConfig::new(hello);

Why does hello go into `P2PConfig?

// TODO: should the client impl just take care of choosing request ids and abstract out multiplexing?

Any reason not to? Any tradeoff here?

// if there were another client its stream could be joined

Makes a lot of sense.

https://github.com/ethereum/devp2p/blob/master/caps/eth.md

^This is a sick doc.

Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

Yep, makes sense. How does Akula do it?

This will implement interfaces that other components (e.g. staged sync, txpool) will use.

How do you envision this to be exposed? Any changes you'd make vs how Akula does it, or how the so-far HeaderClient trait has been exposed in #58?

The network abstraction will also integrate with discovery mechanisms and manage the discovery state.

What kind of discovery mechanisms?

How will we implement the Engine API? It prompts the execution layer to request blocks, which happens over eth p2p. It could also just use the API provided by the Network abstraction.

So looking at Akulas' implementation of the EngineApi it doesn't seem like they have logic for that? It delegates everything to the Sentry implementation? The stuff here also looks interesting. I would encourage us not to copy the code blindly, and think from first principles on how to do that, as you've said.

Sorry, I misunderstood. Do you mean exposing the engine_* endpoints over an RPC server? In that case, Akula exposes this over JSON RPC to the CL, which we also will as shown here.

This proceeds to send a new fork choice to the Headers Stage receiver which will get consumed like this. Effectively CL calls Engine API, which triggers new HEAD which triggers EL to send HeaderRequest to all its peers, and then download the headers.

Does that make sense?

mattsse commented 1 year ago

this is a great summary @Rjected

Implement AsyncRead ...

perhaps this is not even necessary if we have an ECIESStream codec https://docs.rs/tokio-codec/latest/tokio_codec/ and use the https://docs.rs/tokio-codec/latest/tokio_codec/struct.Framed.html adapter which automatically gives us AsyncRead+Write

Server

this pattern is pretty common:

  1. loop incoming
  2. spawn (peer) connections in new task: use shareable context/channels to communicate

CLient

this probably similar to the server but reversed.

for the tech stack, we should look closely at tower's Service trait: https://docs.rs/tower/latest/tower/trait.Service.html which is a request -> response function.

re @gakonst qs

Is there any limitation wrt what async runtime we'll need to use?.

not really, [Framed](https://docs.rs/tokio-util/0.2.0/tokio_util/codec/struct.Framed.html) for example is an adapter over Codec + AsyncRead|Write (which could be io for example) which then implements Stream+Sink with a message type as item.

For Sink it's basically: Item -> Encode -> Bytes -> AsyncWrite, and Stream in reverse: AsnycRead -> Bytes -> Decoder -> Item

Rjected commented 1 year ago

What is the biggest blocker in terms of doing the refactor? What do you think about completely stripping out devp2p-rs as it is right now, and starting a clean write following the APIs you want?

No blockers, just hoping to get some input on the designs. A clean write would definitely let me build these APIs.

If we do it from scratch, do you think we could build directly for such an API?

Yes

Do you mean like builder pattern? We can either do it manually, or for simple structs use something like https://docs.rs/derive_builder/latest/derive_builder/. Same for your Status comment later.

Interesting, that would be useful. Yeah, more referring to something like the builder pattern.

Didn't exactly understand what you mean here

Since we need to handle the Ping and Pong messages, we need to parse all incoming messages and determine if it belongs to the p2p capability (p2p reserves message IDs 0x00-0x0f). We could still provide an interface similar to AsyncRead / AsyncWrite, but it might make more sense to return something that also contains the message ID parsed if it was not part of the p2p capability.

Why does hello go into `P2PConfig?

Because a Hello message is necessary for the p2p handshake, similar to how Status is necessary for the eth handshake. Would anything else go into the config? Or do you think it's possible for a hello message to not be input to the constructor?

Any reason not to? Any tradeoff here?

I don't think there's any reason not to (unless anyone else can think of anything) especially if we provide a request/response API to other components

How does Akula do it?

They have something similar here

How do you envision this to be exposed? Any changes you'd make vs how Akula does it, or how the so-far HeaderClient trait has been exposed in https://github.com/foundry-rs/reth/pull/58?

Overall, I like the idea of splitting things up by interface (e.g. HeadersClient, maybe we have TransactionClient) and having the Network implement those interfaces. Akula has a more general send_message. We could implement this too, but maybe we can get away with most things using higher level, context-specific APIs.

What kind of discovery mechanisms?

discv4 / discv5 / dnsdisc

Do you mean exposing the engine_* endpoints over an RPC server? ... Does that make sense?

Ah yeah that makes sense. The links are very helpful, thanks!

mattsse commented 1 year ago

ref https://github.com/emhane/discv5

mattsse commented 1 year ago

What we need

Discovery stack

Devp2p stack

Server half (incoming connections)

Client half (outgoing connections)

@Rjected a RLPx session is symmetrical, so there's no distinction between client/server peer, right? So outgoing + incoming connections should look the same.

Plugging everything together:

Looks like we have 3 big parts:

  1. udp stream -> yields discovered peers
  2. TCP stream -> yields incoming connections
  3. Outgoing connections: open connection to peer

2 + 3 should produce the same thing, a NodeSession?

Terminology

This can get confusing really quick.

Proposing some terms:

Swarm: Contains the state of the network and can be used to alter (e.g. adding peers manually). This will essentially be the user-facing API.

Node/Peer: Any peer that implements the devp2p protocol

(Connection)Pool: manages a set of connections

struct Swarm {
  incoming // A Stream type that produces "incoming connections"
  pool // manages active nodes, basically just keeps track of background tasks for the sessions, since sessions are separate task they should have a channel back to the pool so `pool <-> sessions` can exchange info, for example for notifying if session closed

}

// this is effectively the main event loop
Stream for Swarm {
   type Item = Event; // Some event type this stream produces

  fn next() {
      // 1. poll `incoming` stream that also yields an event
     // somethings needs to handle this new connection and decided what to do with it, for example, reject if spammer etc. or establish a connection
    // 2. pool manages established connections: spawn session with attached channel
  }
}

these are just my initial ideas, peer sessions are probably pretty self contained.

gakonst commented 1 year ago

makes sense

@Rjected a RLPx session is symmetrical, so there's no distinction between client/server peer, right? So outgoing + incoming connections should look the same.

yes

gakonst commented 1 year ago

Part 1, the encrypted connection is done in #80.

Rjected commented 1 year ago

Wire protocol complete in #82

Rjected commented 1 year ago

p2p component completed in #114, I think everything here has been more or less accomplished after #110 is done

mattsse commented 1 year ago

this has been completed.

Follow-up via individual issues on specific topics.