automerge / automerge-classic

A JSON-like data structure (a CRDT) that can be modified concurrently by different users, and merged again automatically.
http://automerge.org/
MIT License
14.75k stars 467 forks source link

API for communicating deltas #20

Closed ept closed 7 years ago

ept commented 7 years ago

Tesseract requires an API through which it will integrate with the networking layer. It should provide a clean separation between the network and CRDT concerns, so that Tesseract doesn't need to know anything about WebRTC, and the network library doesn't need to know anything about CRDTs.

The starting point is a vector clock, which is a mapping from store UUIDs to sequence numbers:

vclock = {'1234...': 15, '4321...': 7, ...}

It simply means that we have seen 15 operations that originated on store '1234...', 7 operations that originated on '4321...', etc. Applying an operation that originated on another store does not count as a new operation. Only making a local edit creates a new operation and increments the sequence number for the store on which the edit was made.

The simplest protocol for exchanging deltas would then look something like this:

   Alice:                            Bob:

   alice.                            bob.
   getVClock()                       getVClock()

              \                    /
       Alice's  \                /  Bob's
          vector  \            /  vector
             clock  \        /  clock
       (aliceVClock)  \    /  (bobVClock)
                        \/
                       /  \
                     /      \
                   /          \
                 /              \
alice.         /                  \      bob.
getDeltas( <--'                    `-->  getDeltas(
  bobVClock)                               aliceVClock)

              \                    /
       Alice's  \                /  Bob's
          deltas  \            /  deltas
     (aliceDeltas)  \        /  (bobDeltas)
                      \    /
                        \/
                       /  \
                     /      \
                   /          \
alice.           /              \      bob.
applyDeltas( <--'                `-->  applyDeltas(
  bobDeltas)                             aliceDeltas)

In words, each peer first sends the other peer its vector clock ("this is what I know"); whenever it receives a vector clock from another peer, it generates a response containing any operations that other peer is missing ("this is what I know but you don't know yet"); and when it receives any operations from another peer, it applies them locally ("this is what I have learned from the other node").

This means that we would need three new functions:

getVClock() and getDeltas() are read-only and don't change the store; only applyDeltas() returns a new copy of the store with changes applied.

So far, so good. The added challenge arises if something has got screwed up somehow, so that (for example) one node thinks that sequence number 16 for node '1234...' means "assign card 1 to martin", whereas another node thinks that sequence number 16 for node '1234...' means "rename card 3 to foo". That would not merely be a conflict (because conflicting operations should always have different origin nodes), but a violation of the protocol.

This situation could happen, for example, if node '1234...' first generated the "assign card 1 to martin" operation and sent it to another node; then node '1234...' failed and was restored from a backup, and that backup only went up to sequence number 15; then the user performed the "rename card 3 to foo" action, and node '1234...' unwittingly reused the sequence number 16 to mean something different.

The first step towards solving this problem is to detect it. I propose doing that by including a hash of the operations in the encoded vector clock, and then getDeltas() and applyDeltas() can check that the sequence numbers and hashes match. Once a mismatch has been detected, we would need some recovery protocol that figures out exactly where the mismatch has occurred, and how to resolve it. My proposal would be to leave that recovery protocol out of scope for now, so we will merely detect a screwed-up state and raise an error. In future, we can try to automatically resolve it.

To be clear, this kind of inconsistency should not happen in normal operation (including during network partitions), but only happen due to bugs or amnesia (a device doing something and then forgetting that it did it). If we just raise it as an error for now, we can observe how often it actually occurs in practice, and solve it if it's enough of a problem.

ept commented 7 years ago

In terms of integrating this API with a Redux app, I envisage something like the following:

// Network is some fictitious wrapper around WebRTC, or suchlike.
// It is initialised when the app is started, and then constantly runs
// in the background, trying to establish connections with peers.
val network = Network.connect({signalling_server: 'slack', ...})

// Callback for when a new peer is discovered
network.onPeerConnect(peer => {
  peer.sendVClock(tesseract.getVClock(this.store))
})

network.onVClockReceived((peer, vclock) => {
  peer.sendDeltas(tesseract.getDeltas(this.store, vclock))
})

network.onDeltasReceived((peer, deltas) => {
  this.store.dispatch({type: "APPLY_DELTAS", deltas: deltas})
})

// In the Redux reducer:
this.store = createStore((state, action) => {
  switch (action.type) {
    case "APPLY_DELTAS":
      return tesseract.applyDeltas(state, action.deltas)
  }
})
ept commented 7 years ago

Update, perhaps the network could be wrapped up as a component, and then interface with Redux quite nicely:

export default class Network extends React.Component {
  constructor(props) {
    super(props)
    let store  = this.props.store
    this.state = store.getState()
    store.subscribe(() => { this.setState(store.getState()) })

    this.network = Network.connect({
      signalling: 'slack',
      // Pass the getState function into the Network library, so that it can fetch
      // the current state whenever it needs it
      currentStateGetter: store.getState
    })

    this.network.onDeltasReceived(deltas => {
      store.dispatch({type: "APPLY_DELTAS", deltas: deltas})
    })
  }
}

// In the Redux reducer:
this.store = createStore((state, action) => {
  switch (action.type) {
    case "APPLY_DELTAS":
      return tesseract.applyDeltas(state, action.deltas)
  }
})

In this API style, the concept of a VClock doesn't leak into the application code, and there is no need for handling separate onPeerConnect and onVClockReceived events (as the Network library can call getVClock and getDeltas directly on the tesseract object). Only the deltas pass through Redux, and they don't need to be interpreted by the app code — they simply get passed through.

ept commented 7 years ago

Regarding deltas, there is also a question of how the app finds out what has changed (for example, in order to visually highlight some data that was updated by another user). My hunch is that we should do that not by having the app parse the deltas (which could quickly become a mess), but rather by allowing the app to compute the difference between the old and the new state (before and after applyDeltas). But I'm open to ideas.

choxi commented 7 years ago

@ept that's more or less correct, I would make a few additional changes:


// The network doesn't need its own component, we're initialize Redux and Tesseract
// in the master App component so that would be a good place to initialize the 
// network as well
export default class App extends React.Component {
  constructor() {
    this.tesseract = Tesseract.init()

    // In the Redux reducer:
    this.store = createStore((state, action) => {
      switch (action.type) {
        case "APPLY_DELTAS":
          return Tesseract.applyDeltas(state, action.deltas)
      }
    }

    this.store.subscribe(() => {
      // Is this only used when broadcasting changes? If so, it would be
      // more clear to call this network.broadcast to make it more explicit
      // to the developer why the network needs our state object
      this.network.setState(store.getState())
    })

    this.network = Network.connect({
      signalling: 'slack',
    })

    this.network.onDeltasReceived(deltas => {
      store.dispatch({type: "APPLY_DELTAS", deltas: deltas})
    })
  }
}
ept commented 7 years ago

@choxi That looks good to me, thanks for the feedback. 👍 I'll have a go at implementing that API in Tesseract tomorrow.

pvh commented 7 years ago

@ept and @choxi, I spent some time working on the network stuff today with Orion and I have a few notes.

First, I think responsibility for encoding the deltas to send over the wire should lie with the network libraries and not with Tesseract. That will increase composability (for optional features like encryption, client signing, compression, etc) and reduces the scope of Tesseract's responsibility to just the CRDT.

Second, I would prefer to get a list of deltas rather than a full blob of them. There are a few good reasons for this. First, the maximum WebRTC message size is likely to be prohibitively small, and we may want to treat different deltas or clients differently in the future (for example, one might have to send messages differently over a Bluetooth connection.) If there are a great many deltas, we'll probably also want the option to build up the document as they stream in rather than blocking the universe. Finally, building big strings can be surprisingly expensive. Let's leave that work to the JSON library which is presumably well optimized.

So - overall, I think the API should look about thus:

The networking code would look thus:

// The network doesn't need its own component, we're initialize Redux and Tesseract
// in the master App component so that would be a good place to initialize the 
// network as well
export default class App extends React.Component {
  constructor() {
    this.tesseract = Tesseract.init()

    // In the Redux reducer:
    this.store = createStore((state, action) => {
      switch (action.type) {
        case "APPLY_DELTAS":
          return Tesseract.applyDeltas(state, action.deltas)
      }
    }

    this.store.subscribe(() => {
      // Is this only used when broadcasting changes? If so, it would be
      // more clear to call this network.broadcast to make it more explicit
      // to the developer why the network needs our state object
      this.network.setState(store.getState())
    })

    this.network = Network.connect({
      signalling: 'slack',
    })

    this.network.onMessageReceived(message => {
        content = JSON.parse(message.data)
        if (content.vectorClock) {
             // this should probably yield deltas as they're calculated, and they should get packed
             let deltas = tesseract.deltasAfter(vectorClock, state)
             let content = {vectorClock: state.getVectorClock(), deltas: deltas}
             network.sendMessage(content)
        }
        if (content.deltas) {
           onDeltasReceived(deltas)
        }

    this.network.onDeltasReceived(deltas => {
      store.dispatch({type: "APPLY_DELTAS", deltas: deltas})
    })
  }
}
choxi commented 7 years ago

Could this all be pulled into the network.onMessageReceived function before it invokes the callback? It adds a lot of mental overhead to the DX because now the developer has to know about the CRDT protocol being used to send/receive deltas.

        content = JSON.parse(message.data)
        if (content.vectorClock) {
             // this should probably yield deltas as they're calculated, and they should get packed
             let deltas = tesseract.deltasAfter(vectorClock, state)
             let content = {vectorClock: state.getVectorClock(), deltas: deltas}
             network.sendMessage(content)
        }
pvh commented 7 years ago

Yea - we'll hide all that from Trellis before too long, but I'd like the individual pieces to be cleanly factored.

ept commented 7 years ago

I have done an initial implementation of deltas in 2b6d76e. @pvh it gives you JS objects that you can JSON.stringify, as requested. It doesn't yet have the hash chaining to detect reuse of sequence numbers, that's still to come. Let me know what you think.

ept commented 7 years ago

This network communication layer has been running for a while now, so I'll close this issue. There is still more to be done, but I'll move the remaining work to separate issues: