ipfs-shipyard / peer-crdt

Peer CRDT
MIT License
60 stars 8 forks source link

fast multi-log sync #2

Open pgte opened 6 years ago

pgte commented 6 years ago

The setting

Even though peer-crdt is transport agnostic, it makes some assumptions about the transport that it's using underneath. Here are some of these assumptions:

The transport, (peer-crdt-ipfs, for instance) is responsible for:

The problem

Even though it's very IPFS-y, this structure poses some performance challenges, namely the replication performance:

To replicate a set of log entries, the receiving replica has to pull them one by one. For each log entry received, it has to inspect it and get the log entry parent CIDs. For each one, it has to verify if it exists locally. If it doesn't exist locally, it has to fetch it from the IPFS network. And so own recursively until all the missing log entries are replicated. This means that it has to do a series of sequential lookups to sync the local log with a remote log. This is obviously very inefficient.

(One optimisation can is in place: Alongside with the HEAD CID, a replica can also broadcast all the parent content ids, up to a count of MAX_PARENTS. This allows a replica to make a bunch of fetch requests in parallel. But if MAX_PARENTS is greater than the count of missing log entries, we're back to the vey inefficient fetch-and-resolve-all-parents loop as before.)

Efficient replication requires knowing the remote replica state

I recently realised that this is inefficient because a replica doesn't know what is the replication state of another replica. If it did, it could send all the missing log entries in one burst.

Also, this happens because of content-addressing. A CID is made from hashing the block content, and so doesn't tell us anything from the replication state of the remote replica.

We need a solution that allows a replica to know exactly which log entries a remote replica needs, so that it can send them immediately and in one burst.

One solution

Vector clocks. (If you are not familiar with VCs, this is a good explanation).

Here I propose a solution where replicas keep track of the remote replica state, and push changes instead of expecting the remote to fetch them.

Here is, broadly the main algorithm I propose:

Illustrated

A picture is worth a thousand words. Let's then illustrate how this could work with two replicas.

One writer, one reader

Replica A creates 3 operations, O1, O2 and O3. Replica B does not create any operation.

Our goal here is simply to keep replica B in sync with the changes created by replica A.

untitled drawing-28

Replica A starts by creating each log entry. Each log entry has a vector clock. For the last log entry, O3, the vector clock is {A: 3}.

Now replica B comes online. Since it has no entries, it's vector clock is empty. It broadcasts it's VC to other listening nodes. Replica A receives that message, and now it knows about replica B's state.

fast-multilog-swap

Replica A realises that replica B vector clock is smaller than the local vector clock. By doing some simple vector clock math, it realises that replica B is missing 3 log entries. It fetches those entries and sends them (O1, O2 and O3) to replica B in one burst. It also sends each log entry correspondent vector clock. All this in one burst.

fast-multilog-swap-2

Now replica B has all the missing log entries, both replicas converged:

fast-multilog-swap-3

1 more writer

Now replica B makes some changes to the CRDT, creating 2 new operations (O4 and O5). It also updates the vector clock for each operation:

fast-multilog-swap-5

Now replicas B head is {A:3, B:2}. Because replica B knows that the replication state of replica a is {A:3}, it realizes that replica's A vector clock is smaller than the current local vector clock. Because of that, replica B immediately realizes it needs to send replica A some entries to update it's state.

And so replica B calculates the difference between it's own vector clock and replica A's vector clock. Because of this, it realizes that replica A is missing 2 log entries with the following vector clocks: {A:3, B:1} and {A:3, B:2}, which correspond to operations O4 and O5. Replica B sends these entries to replica A:

fast-multilog-swap-6

Now, replica A has the missing operations, and is able to integrate them into its log:

fast-multilog-swap-7

The two replicas are in sync.

Two concurrent writers

Now let's see how our system can handle two concurrent writes, one done by each replica.

Let's continue with out example replicas:

Replica A creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for the new operation (which we're calling O6) is going to be {A:4, B:2}

At the same time, replica B also creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for this new operation (which we're calling O7) is going to be {A:3, B:3}.

fast-multilog-swap-12

Now both replicas have different head vector clocks. Replica A's head VC is {A:4, B:2} and replica B's head VC is {A:3, B:3}.

Let's see what happens in each replica once each new operation is created.

Replica A:

Replica A has a head VC of {A:4, B:2} and knows replica B has a VC of {A:3, B:2}, which means it knows B is lagging behind. By doing some vector clock math it knows that replica B is missing one operation that corresponds to the VC {A:4, B:2}. It then fetches that operation from the operation log and sends it to replica B.

fast-multilog-swap-10

Replica B:

When it creates operation O7 (which has a vector clock of {A:3, B:3}) replica B knows that replica A has a VC of {A:3, B:2}, and is lagging behind. By doing some vector clock math it knows that replica A is missing one operation that corresponds to the VC {A:3, B:3}. It then fetches that operation from the operation log and sends it to replica A.

fast-multilog-swap-11


Now, each replica has all the operations in their log:

fast-multilog-swap-13

Convergence

But now each replica doesn't have a defined HEAD operation. Since there was a divergence, each replica will have to create a merge log entry pointing to both parents. The vector clock for each of the merge entries will be the same: it will be the result of the merge of both vector clocks:

vc1 := {A:4, B:2}
vc2 := {A:3, B:3}
merged := merge(vc1, vc2) // {A:4, B:3}

The result of the merging operation is {A:4, B:3}, and that will be the vector clock for both merge log entries:

fast-multilog-swap-14

Now both replicas A and B are definitely in sync.

in sync

pgte commented 6 years ago

@diasdavid @vmx This solution is more specific than IPLD graph-swap, since it's tailored for the multi-log data structure, but I think we need to introduce vector clocks to keep track of remote state. What do you think?

pgte commented 6 years ago

Just watched @dominictarr 's talk about scalable secure scuttlebutt (video here) — thanks @olizilla! — and there are some ideas that have some parallel here:

pgte commented 6 years ago

Eager and lazy connections:

Turning the redundant connections from eager into lazy is a great idea, which I think we can apply here. I was worried of the redundancy of operation transmission for when there are more than 2 replicas. I think we could solve this this way, making an intelligent use of vector clocks. (A vector clock transmits the information that I know about, if a replica needs new information about a different replica and the specific replica is not in the network, that replica can explicitly ask another replica for that, (instead getting those operations pushed)).

pgte commented 6 years ago

Optimizing vector clock transmission

Instead of broadcasting the entire vector clock to every peer, send only the vector clock differences to each peer individually.

dominictarr commented 6 years ago

@pgte I've been trying to encourage ipfs peeps to use this protocol for a while! I'm glad you've seen this! Lets collaborate on a spec we can use in both ssb and ipfs! I am beginning to roll out support for this into ssb, but my aim is to make an implementation that is sufficiently flexible for other protocols to use. https://github.com/dominictarr/epidemic-broadcast-trees

I'd expect that ipfs will want to use different encoding, but I figure if we can separate the wire protocol, we can use a shared implementation of the behaviour. (our reference implementation is javascript, but we also have a go implementation underway or coming soon)

daviddias commented 6 years ago

Hi @dominictarr o/ , good to see you around! :)

Pinging @vyzo who has been working on an epidemic-broadcast-trees as well

vyzo commented 6 years ago

I should point out that epidemic broadcast trees behave very poorly with multiple/random sources.

dominictarr commented 6 years ago

oh, I should probably make clear: in ssb we use ebt in the "protocol instance per source" mode. since you are also talking about replicating logs, I think we are in the same situation.

pgte commented 6 years ago

@dominictarr thanks so much for reaching out! This looks like it's just what is needed here, thank you for abstracting this into its own package! I'll take a look at the code and see how this can be plugged.

I wouldn't worry too much about the encoding as we're going to use multiplexed p2p switch connections which expose a raw stream, so we can use whatever.

I'll be taking a look at the code and getting more familiar with EBT and trying to integrate them here. Sounds good?

dominictarr commented 6 years ago

@pgte am happy to answer any questions - just post issues on the epidemic-broadcast-trees module, I guess.

vyzo commented 6 years ago

You should also check out our progress with gossipsub: https://github.com/libp2p/go-floodsub/pull/67

We want to have a mode for single/fixed source epidemic optimizations in the next iteration.

dominictarr commented 6 years ago

@vyzo is the a discription of the design that is higher level than the protocol? like what performance properties is it expected to have GRAFT and PRUNE seem a bit like EBT but it's hard to tell whats intended

vyzo commented 6 years ago

The intention is to build a mesh of a bounded degree; basically mold floodsub to something that doesn't blow up from amplification as you build larger overlays. Wrt performance, there are some sample simulations in https://github.com/vyzo/gerbil-simsub; I have run a lot more, and the samples are representative.

The protocol could be turned into EBT by sending GRAFT/PRUNE in response to gossip/late messages, although some care must be exercised for stability. It might be more appropriate to use a CHOKE message for suppressing flow from late messages without severing the link.

dominictarr commented 6 years ago

@vyzo by degree you mean the number of connections? as in https://en.wikipedia.org/wiki/Degree_(graph_theory)

I still don't understand what GRAFT and PRUNE do. are they messages a peer send to another peer? do they have an argument (such as what source they are about?)

vyzo commented 6 years ago

The number of active connections used by the overlay to propagate messages; it's totally the node degree in graph theoretic sense.

GRAFT and PRUNE are sent between peers to forge the overlay. They have the topic as "argument", there is no notion of single source in the protocol level.

dominictarr commented 6 years ago

@vyzo is it possible to put gossip sub into a "lazy mode" where it sends a message that you have received an item, but without sending the whole thing - and is that is associated with the subscription channel somehow?

dominictarr commented 6 years ago

hmm, but I'm guessing that "IHAVE" messages are just hashes (as in the style of the ipfs replication protocol)

vyzo commented 6 years ago

You want to just send IHAVE messages in place of forwarding the actual messages? It is possible, but it's not implemented that way. What do you have in mind?

vyzo commented 6 years ago

The IHAVE messages contain seen message ids in the last 3 seconds, they are not hashes.

dominictarr commented 6 years ago

@vyzo what is a message id? if it's not a hash is it channel_id+sequence or channel_id+timestamp? (in my ebt, channel id + sequence are used for replication, but in all other situations ssb uses hash as id)

vyzo commented 6 years ago

The message id is constructed using the sender ID together with an atomic counter.

pgte commented 6 years ago

@vyzo this is a bit obscure to me.. is there a place where this gossip-sub protocol is described in a more abstract / reusable way?

vyzo commented 6 years ago

@pgte there is https://github.com/vyzo/gerbil-simsub; it looks like it's time to make a spec pr in libp2p-specs though.