digidem / mapeo-core

Library for creating custom geo data and syncronizing via a peer to peer network
23 stars 2 forks source link

Peer RPC protocol #81

Open hackergrrl opened 4 years ago

hackergrrl commented 4 years ago

While trying to add information to our peer handshake about sync state, I've been noticing the limitations of our current sync protocol.

Our sync protocol right now looks like this:

P1                               P2
|                                 |
| handshake-request -------->     |
|                                 |
|    <-------- handshake-request  |
|                                 |
|                                 |
|  handshake-accept -------->     |
|                                 |
|    <-------- handshake-accept   |
|                                 |
|                                 |
|   <---- hypercore sync ---->    |
|                .                |
|                .                |
|                .                |
|                .                |
|                                 |
|     x---- STREAM ENDS ----x     |

The protocol is very fixed in its sequence, unlike, say, HTTP, which allows arbitrarily-ordered and concurrent two-way communication.

I've been rereading the scuttlebutt protocol docs for ideas. SSB uses an RPC mechanism for all data exchange. Even the sync protocol works in terms of RPCs, rather than a special inner protocol like "hypercore sync" vs "hypercore extensions."

Something not unlike this would make our sync protocol much easier to extend. This could include several desired features, such as RPCs like:

This would also require putting hypercore into "sparse replication mode", to prevent the stream from terminating upon completion.

For now it would be easiest to implement these RPCs over hypercore extensions, but the nice part of RPCs is that from the code in mapeo-core it will just look like function calls, making it very easy to swap out if we change the medium of the RPCs in the future.

gmaclennan commented 4 years ago

Thanks @noffle, I like where this is going. The scuttebutt protocol docs are really helpful. It doesn't look that different to me to the Hypercore / Dat protocol, which looks kind of like RPC but the messages can happen in any order.

As far as I understand it, when you "register an extension" what you are doing is registering send and receive handlers for RPC messages sent in the stream. And the "sync" is not a special inner protocol as such, just a sequence of RPC calls saying what data they have, what data they want, and requesting and sending the data: https://datprotocol.github.io/how-dat-works/#exchanging-data. It's good to see that, from what I see, the inner hypercore protocol has remained stable without breaking changes, and the changes have been in the handshake / encryption - I had been worried that it had been more unstable than this.

It looks like the additional benefit that the hypercore protocol "offers" is encoding each RPC call / message to a particular channel / hypercore which means that multiple hypercores can communicate over the same stream, although I don't think we use this for multifeed at this time right? The "How Dat Works" document says that this is limited to 127 channels, which is too small for us, but I don't see any limit in the code. I wonder if this limit is still in place and why it was limited? I guess to reduce the number of bytes sent over the stream, since increasing it adds bits to every message.

On a related note to this have you seen https://www.npmjs.com/package/corestore which seems to solve the same problem as multifeed?

okdistribute commented 4 years ago

How dat works is based on the older version of hypercore-protocol & hypercore, which may have been different than the latest, so it's possible that it is no longer limited to only 127 channels.

We should talk about corestore monday, there are lots of behind the scenes discussions on this

hackergrrl commented 4 years ago

This is being implemented currently on the rpc branch.

On a related note to this have you seen https://www.npmjs.com/package/corestore which seems to solve the same problem as multifeed?

Yes.

gmaclennan commented 4 years ago

Thanks @noffle, I've been reading through the rpc branch and the implementations/protocols of muxrpc (and packet-stream), multiplex and simple-message-channels.

First of all, this is really complex stuff, and I'm only just getting my head around all the work you have done so far and understanding the complexity of it. I think this is a really important part of our whole technology stack and it's important that we get it right, and I really appreciate more than ever your work on this to get it to this place.

I've been trying to figure out how to talk about this part of our stack and get clear in my mind what is actually happening, and whether we are making the best decisions in terms of implementation so that we don't run into challenges further down the road. It would be amazing if we were able to brainstorm this in person with a whiteboard, but given the impossibility of that right now I'd love to get your thoughts on whether I understand things correctly, and share some of my ideas.

I use the term "mapeo protocol" below to refer to the communication protocol between two mapeo peers. As far as I understand it, any protocol involves sending and receiving messages over a socket. Node streams abstract away some of the details of TCP (or other transport) networking, and will chunk and send data as needed. Any protocol over a socket needs to know how to reconstruct messages that might arrive in several chunks, and all of the libraries I mentioned above seem to do this in one way or another, normally with length-prefixed messages.

Each message over the socket needs to be handled by a different part of our stack. As I understand it, there are two ways of deciding how a message should be handled: by stream state or by framing the message.

handshake stream uses stream state to decide how messages are handled: before the handshake is complete messages are handled within the handshake, and once handshake is complete messages are handled by the rest of the stack. The noise protocol negotiation in simple-hypercore-protocol works in the same way: while the stream is in "handshaking" state, messages are handled by simple-handshake and when the handshake is complete messages are passed through to hypercore.

The other way of directing messages is framing the message, and it seems like the common way to do this is to add some kind of ID to each message and uses that id to direct the message to the right handler (e.g. multiplex uses this to direct messages to the right stream, hypercore uses it to direct messages to the right hypercore when multiple hypercores communicate over the same stream.

Hypercore protocol does this framing in simple-message-channels. The ID for each message is split into two parts: a channelId and a type. The type defines the handler, and the channelId allows multiple streams to communicate over a single stream.

multiplex seems to do something similar but at a lower level: it acts on chunks within a stream, and frames each one with a channel id (and a message length, so the receiver can re-organize chunks if the chunking was changed over the socket).

packet-stream (used by muxrpc) is also doing something similar, framing each message with an id that it uses to direct the message. Messages in a stream are all sent with the same id, and each RPC has a unique id.

As far as I understand the rpc branch currently, it combines all three of these mechanisms, that means that the actual wire protocol (the data being sent on the socket) is quite complex:

During the initial handshake, the messages are sent with packet-stream-codec and the wire protocol format seems to be roughly described in the readme, with each message framed with a header containing flags, length, req. The message body encodes a messageId so that messages can be handled by the corresponding method on each side of the connection.

After the handshake, data goes over multiplex, which does its own framing. I don't fully understand how multiplex is chunking messages and reconstructing them, so I don't know if it's framing chunks as written to the stream (which don't necessarily correspond to messages being sent over the stream) or framing entire messages. Messages are framed with a channelId and varint length.

As far as I understand it, this means that over the wire, once data is going through multiplex, messages are framed twice, once by multiplex and again by either muxrpc or hypercore.

My concern about all of this is that it results in a complex wire protocol which is a mash up (and dependent on) three different protocols: multiplex, muxrpc and hypercore, all wrapping each other. Unfortunately the actual protocol of multiplex is not well documented, and muxrpc does not document the wire protocol that clearly either. I'm worried that this kind of ties us to all three of these libraries for the mapeo protocol communication - the messages sent over the wire are framed multiple times by each of these libraries, and the format depends on all of them. It seems unlikely (?) that the protocol of either muxrpc or multiplex would change, but it would be hard for us to switch to our own implementation and keep the same wire protocol.

Having understood a little about what all these different libraries are doing, the approach in simple-message-channels (SMC) makes more sense. I'm wondering if it would make sense to use the same approach for the whole Mapeo protocol, relying on the channels of SMC to do the multiplexing (like you do currently in multifeed).

We could define the mapeo protocol with our own types of messages (SMC supports up to 15, plus extensions) and specify that channel 0 is reserved for mapeo protocol messages. Then we can pass the same stream to multifeed + hypercore and each hypercore will communicate over its own channel. We could re-write blob-replication-stream to work over RPC messages sent using SMC, or we could write a simple stream multiplexer that encodes stream messages to a specific SMC channel + type.

What I'd like to get to is a place where we can clearly say how messages in the mapeo protocol are encoded and what those messages are. We might be able to do that with muxrpc, but I'm a little wary of mixing all these protocols / message framing together.

I may have misunderstood a few things here, and I may be heading in the wrong direction entirely. I'd be interested to hear thoughts and ideas.

okdistribute commented 4 years ago

It would help me to follow along (and likely also other app developers/outside contributors) if there was a state/process/swimlane/network sequence diagram of some sort and then iterate upon that visually. I think this would also be helpful for newcomers who want to contribute to have a quick idea of what components are called in what order

hackergrrl commented 4 years ago

I shared this a few weeks ago in #ddem: https://gist.github.com/noffle/765a47ccbed6c88b3b0313ea81fe633e

It's changed a bit though, so instead we'll have an open RPC channel, but open substreams for a) multifeed, and b) blob-store-replication-stream.

hackergrrl commented 4 years ago

I really like your notes & ideas @gmaclennan. I really like how simple SMC is. Having a protocol that's super simple would benefit us in many ways: debuggability, teachability, extensibility, and likely fewer bugs. Here are my notes & questions:

gmaclennan commented 4 years ago

Q: you mentioned SMC supports 15 types + extensions, but I don't see extensions mentioned anywhere in that repository. We could implement space for more types ourselves (I think we'll need it) by either having a special type that means "this is followed by a varint with the real type" or by forking (ideally PRing, but not sure if we can /wo breaking everybody else) SMC to have more bits for type (or use a varint).

Yes that was confusing the way I said it, extensions are not related to SMC. Hypercore uses abstract-extension and uses the options message type to pass a list of supported extensions, and passes extension messages with type 15. This is useful in order to support certain messages that are supported by some clients but not others, in a way that remains backwards compatible. I'm not sure it is something we need, because we are not forseeing others using Mapeo as a building block for other projects (yet).

Q: you mentioned passing in our own SMC stream into hypercore for it to use. Is this possible? It looks like simple-hypercore-protocol creates & manages its own SMC instance and cannot be substituted in. This will mean we still end up double-framing hypercore messages (our SMC plus their SMC), but it looks like it'd be much easier to understand vs the multiplex and muxrpc protocols.

I think there are two approaches to this:

  1. Hypercore can re-use the same stream to replicate multiple hypercores, with each hypercore passing messages on its own channel - that's how I understand multifeed works. I don't understand the hypercore code that decides the channel id for each feed though. We would need a way to reserve a channel for "our" messages. Once we had that, one solution would be deplux stream that reads incoming SMC messages, if they are with "our" channel id, then handle them, if not them re-encode and write to the hypercore/multifeed replicate stream. In the other direction it can be a passthrough stream from hypercore/multifeed that we would also write messages to. This would have the overhead of decoding messages twice (we would decode, check the channel id, then re-encode for hypercore which would then decode) but I don't think that is a very large overhead.

  2. The other way is what you suggest with "double framing" - we say "messages of type X are hypercore messages", then we read each message from the stream and if it is type X then we write it to the hypercore/multifeed replication stream. In the other direction we receive a message coming from hypercore/multifeed then wrap it in another SMC. (let me know if I'm not being very clear here). I like this option because then we could use the "channelId" in the future to support syncing multiple projects over the same connection (if needed) - one channel for each project.

  • We'd need to implement our own RPC mechanism for request/response style RPCs, as well read-stream style RPCs.
  • As I understand it, this doesn't solve the problem of streaming data. SMC operates on single fixed-length messages. While one message is transmitting, no others may be transmitted. It blocks not just the channel, but the whole SMC machine. Streaming multiplexed data is required for sending large objects like media and the app upgrade binaries themselves. We'd need to have another framing mechanism inside of SMC messages (at least for types we want to represent streams) to encode which active stream the data is meant for, and a way to signal errors. This is Work(tm), but it might be worth it to have a very explicitly clear & documented minimal protocol that we feel good about using & reusing long-term.

Yes good point, I had not thought about streaming data, although SMC messages can be any size - the message uses a varint length prefix. I'm surprised to see that hypercore does not seem to chunk / stream data - from what I see it sends each block of data from a feed as a single message. The spec for hyperdrive suggests that files are split across several feed data blocks/chunks in the hypercore, but it looks like that is not implemented yet. It's not a deal for small chunks/files - it just means memory use can be high with large files (e.g. writing the whole file as a single message). Since we're targeting mobile I agree streaming/chunked sending is important.

gmaclennan commented 4 years ago

Just to throw out some ideas about how to implement this as I my brain is bubbling away thinking about this, some potential message types:

// type=0 RPC Request
{
  messageId: number,
  methodName: string,
  arguments: Array<JSONValue>
}

// type=1 RPC Object Response
{
  messageId: number,
  error: string | null,
  data: JSONValue | null
}

// type=2 RPC Data Response
{
  messageId: number,
  data: Buffer,
  continued: boolean
}

// type=3 Hypercore Message
// Raw buffer, message passed through to replicate() stream

// type=4 Blob store message (could be hypercore)
//...

Implementation-wise, requests are sent with incrementing messageIds, and responses are sent either as an object response or data response (these could be the same type, but having two might make encoding more efficient). For streamed responses we would chunk the data and send in separate messages, using something like block-stream2. The little experiment I was working on rpc-reflector has one way of implementing RPC with basic tracking of messageIds.

There are various options for how we encode these messages into a buffer:

Something like protocol buffers would minimize overhead for sending large amounts of binary data e.g.

message StreamResponse {
  required messageId uint64 = 1
  required data bytes = 2
  optional continued bool = 3
}

I think the goal here would be to have a well defined wire protocol format for Mapeo, such that it would be possible to change our implementation but keep the protocol, or for someone to write a Mapeo client in another language like Rust - not that that's something we should do right now, but I think it will future proof the work.

hackergrrl commented 4 years ago

I prefer the double-framing approach 100% over trying to wedge ourselves into hypercore's protocol, which I think would be very fragile and prone to break if hypercore changed its protocol or decided to ignore our attempts to reserve one of its channel. Keeping the protocol layers separate feels important to minimize breaking changes.

For the RPC framing, I'm in favour of a schema-ful binary format like protobuf, because a) having a very concise schema means minimizing introducing bugs, and b) minimizing the frame size on each message.

gmaclennan commented 4 years ago

Hi @kira yes having reflected on this I agree with you.

I think I understand a bit more now about why SMC is designed the way it is: channels are encoded as a varint to support an unlimited number of hypercores syncing over the same stream. The type id is there so that different message types can have different encodings. SMC also does length-prefixed encoding so that messages can be decoded.

After experimenting with fixed-channel-multiplexer I'm not sure we need the complexity of SMC. I think we just need what fixed-channel-multiplexer does, adding a channel/type to each message so that it can be dealt with in its own way. I don't think that we need varints either, because I think if we just used a single byte to store the channel/type 255 would be enough, and that would simplify the code and also enable us to postfix the message with channel/type rather than prefix, which reduces the amount of buffer copying needed. SMC also does a bunch of clever stuff to encode types to only 4-bits (as opposed to a single byte) to really save space, which might be important for us to consider since we would be framing all hypercore messages with this.

That way we can use that channel type to either direct the message, or decode/encode it, e.g. we assign 0 to hypercore messages, which are passed through to hypercore, 1 to blob stream messages, and the rest to RPC message types. I've also been looking at MessagePack and I think it is a good option because it does not require specifying a schema, it's basically a more compact JSON.stringify() that also supports encoding buffers. Protobufs would be more efficient (smaller), but if we are not sending many RPC messages we should weight the performance increase vs. code complexity. If we used MessagePack then all RPC messages could just be on channel 2 for example, and they could encode a message type as a string (e.g. method name) or a number as needed. I've been doing a lot of experimenting with RPC protocols on rainy weekends from my work on https://github.com/gmaclennan/rpc-reflector so happy to bounce ideas around on that.

gmaclennan commented 4 years ago

One data point that I think is relevant to this discussion is bandwidth overhead related to how many messages hypercore is sending. E.g. if we have a mapeo-core database with 100,000 blocks (observations, nodes, etc) then how many messages are sent during sync? If each block is sent in a separate message, and there are also ACK messages sent, then how we do the framing might be relevant, e.g. if we prefix a length prefix + channel identifier to each hypercore message, and that takes 2 bytes, then that could add 200kb+ to the sync. I think that feels pretty low, so probably ok, but it would be good to get some real numbers and what percentage of bandwidth different framing options might add to a sync. It's not an issue for local sync, but for internet sync over slow connections it could become an overhead.

hackergrrl commented 4 years ago

and there are also ACK messages sent

Hypercore defaults to not sending ACKs, but each block is sent in its own DATA message.

I don't think the overhead will matter much: the average observation is around ~1.4kb. 100k observations is then around 143mb. An extra 200kb or even 1mb shouldn't make much of a difference. E.g. if someone had to wait 15 minutes for 150mb to transfer, 1mb of overhead adds ~1 second of additional transfer time. It could be interesting to snoop a hypercore sync session and see the median message length, but my feeling is that if our header is < 1% of the message size it should be ok.