canvasxyz / canvas

Programmable runtime for peer-to-peer applications
https://canvas.xyz
62 stars 5 forks source link

Add merge functions #206

Open raykyri opened 7 months ago

raykyri commented 7 months ago

We want to allow contract writers to use CRDTs when writing applications, so that they can implement collaboratively mutable data structures like:

To support CRDTs, we could implement them individually in the model framework, but a more general solution would be to add a $merge function that is called whenever a conflicting set of writes is seen in the database table:

export const models = {
  docs: {
    id: 'primary',
    text: 'string',
    $merge: ([{ id, text, $id }, { id: id2, text: text2, $id: $id2 }]) => {
      return { id, text: merge(text, text2) }
    }
  }
}

The merge function takes two (and perhaps more) arguments, each of which represents the row that was edited concurrently along different causal paths. It merges those rows together and returns the merged row, which gets written to the database.

To figure out the arguments to $merge, we need to detect whether there are conflicting / unmerged branches in the causal log (relative to the record that is returned when we call db.get()) and fetch the most recent write to that database row. Note that there may be zero, one, or many unmerged branches, but we can just recursively call the merge function to support multiple unmerged branches.

Branch Indexing

First, we track of the number of parallel branches of history at any given time. We do this by keeping a branch counter, which starts at 0 and is incremented every time an action is added and any of its parents already have children. We annotate each message in the GossipLog store with the index of the branch that it's on.

Whenever the causal log branches, the branch counter is incremented by 1, and nodes on the newly forked branch are assigned the largest value of the branch id, which looks like this:

A1=0 --- A2=0 --- A3=0 --- A4=0 --> ...
      |                 |
      -- A5=1 --- A6=1 --
               |
               -- A7=2 --> ...

Alternatively, if an action at time X A_x doesn't create a new branch, then it just inherits the min of the indexes of its parents.

The branch counter is only tracked locally -- different nodes may have different branch indexes for the same message, depending on the order in which they receive operations. (Branch indexes are never reused, so as the log fills, the branch index will grow larger.)

Second, we keep track of the maximum branch index at each clock value. This can be done when each message is added to the log - we can just take the max of the newly added message's branch index, and the current maximum branch index.

Third, we also need to keep track of where branches were merged into another branch, since it's possible that if we're querying for an action A_t at time t on branch B', the most recent write to the model actually happened on another branch B that merged into branch B'. So, whenever a branch is merged (i.e. an action is created with parents with multiple branch indexes) then we keep a record of the merge. Later, recursively traversing this graph will allow us to get the value of the model at clock time T.

|------------|
| B | B'| T  |
|------------|
| 3 | 1 | 10 |
| 1 | 0 | 11 |
| 3 | 0 | 11 |
| 4 | 3 | 15 |
|------------|

Querying

Lazy merging: We call the merge function lazily - otherwise, we would have to scan each model store in its entirety for unmerged pairs of actions every time a message is received with multiple parents. (A different approach to merge functions might implement eager merging, where we efficiently diff the model database when an action with multiple parents is published, but this requires implementing a Merkle index or Prolly tree at the model table level, which is out of scope for this exercise.)

To decide which model values are provided as arguments to the merge function, we have to compute what the model value M_x was for each parent A_x of the message.

We can do this by looping over each parent:

  1. Get the branch index of the parent.
  2. Query for the latest value of M_x on the branch of the parent. This is now the lower bound b for further queries: we are only looking for other branches that could possibly have written to M_x more recently than b. (Note that b might be undefined, if write to M_x occurred on the branch.)
  3. Query for branches that merged into A_x after b; we call this the parent set.
  4. On each of those branches, query for the latest value of M_x. For any values of M_x we find on these parallel branches, add them to the list of arguments to the merge function.
  5. Repeat steps 3-4, querying for branches that merged into any of the parent set of branches, using the new lower bound that we found in the parent set branch (or no lower bound, if no write to M_x was found on the branch that merged into the parent).

This allows us to collect all the values of M_x from branching topologies that look like this:

A1 --- A2 --- A3 --- A5 --- A10 --- A11
                         |
              A4 --- A6 -|
               |
       A8 -----|
       |
A9 --- |

The query algorithm should produce a set of model values M_x1, M_x2, .... Deduplicate these values (by message ID) and pass them to the merge function.

joeltg commented 7 months ago

alright I think finally understand this, so let me summarize and tell me if it's right so far

context/motivation:

  1. we want to support db.get
  2. db.get has to be deterministic, which means it must only use the effects of the current message's transitive ancestors, and nothing else
  3. the current behavior is to return the "last" value across all branches, where "last" sorts first by clock value and then by message hash. this has some undesirable properties, like "mixing and matching" values between branches

proposed alternative:

  1. db.get(model, key) identifies the set of "concurrent" values for the key, ie the values from all the actions in the set of transitive ancestors who wrote (set or delete) to that key and were not subsequently overwritten by a more recent ancestor.
  2. this set of values is reduced one-by-one using the model's $merge function, and the result is returned from db.get. if the set is empty, db.get returns null.

and then the branch indexing thing is a way to implement getConcurrentValues(messageId, model, key)

raykyri commented 7 months ago

yes that's exactly it

it's potentially expensive in a world with many concurrent merge branches. it may be the case that we want to "settle" the results of the merge function back to the log by making the equivalent of a git merge commit.

rjwebb commented 7 months ago

ok this makes sense to me

joeltg commented 1 month ago

EDIT: disregard this entire comment, this is not sound unfortunately :(


Bringing the design here a little closer to code - we want to implement an internal utility method getConcurrentValues which will allow us to do the "lazy merging" described here.

declare function getConcurrentValues(
  parents: string[],
  model: string,
  key: string,
): Iterable<{ messageId: string; value: ModelValue }>

When an action handler calls db.get for a model with a merge function, we iterate over the result of getConcurrentValues(parents, model, key), reducing the values using the user-provided merge. This means AbstractRuntime.getModelValue will get factored into two "cases" - the current one which implements last-write-wins, and a new case for merge reduction.

The main problem is how to implement AbstractRuntime.getConcurrentValues. One way to do this without adding additional indices is to use ${model}/${keyHash} key in the $effects table as a mutable cache of concurrent message IDs (string[]).

Whenever a record is set or deleted, we look up its current concurrent effect IDs:

const effectIDs: string[] = db.get("$effects", `${model}/${keyHash}`)

Then we filter out any that are ancestors of the current message, and add the current message ID.

// context: ExecutionContext
const { parents } = context.message

const filteredEffectIDs = effectIDs.filter(
  (id) => parents.some(
    (parent) => !context.txn.isAncestor(context.id, parent)
  )
)

db.set("$effects", {
  key: `${model}/${keyHash}`,
  value: [...filteredEffectIDs, context.id]
})

Then to implement AbstractRuntime.getConcurrentValues(parents, model, key) we can do

function getConcurrentValues(context: ExecutionContext, parents: string[], model: string, key: string) {
  const keyHash = AbstractRuntime.getKeyHash(key)
  const concurrentEffectIDs: string[] = db.get("$effects", `${model}/${keyHash}`)
  return concurrentEffectIDs.filter((id) => context.txn.isAncestor(context.id, id))
}