basho / riak

Riak is a decentralized datastore from Basho Technologies.
http://docs.basho.com
Apache License 2.0
3.94k stars 536 forks source link

Add more CRDT support to Riak #354

Closed russelldb closed 10 years ago

russelldb commented 11 years ago

CRDT support in Riak

Aim

Provide data types in Riak that make it easier for developers to reason about eventual consistency. Currently in Riak we store opaque data. We do nothing to help the developer model their problem or deal with eventual consistency. By providing a set of data types in riak that a developer can use to compose a complex value, that has well defined merge semantics, we aim to make using Riak easier.

Idea

To expose CRDTs[1] to developers. Originally we had hoped to avoid the current Get-Modify-Put cycle and provide an interface of simply shipping operations to Riak. However recent work on this has shown some inconsistencies in cases where the state on the vnode handling the request is not that observed by a user. There are more exmaples in the types below, but we have decided that the familiar Get-Modify-Put cycle is needed. When you get a CRDT value, some context information will be returned, this must be sent back to the server along with the operations to perform. Failure to provide the context well lead to less correct behaviour.

We already have an example of CRDTs in riak 1.4 with counters. If a user wants to increment a counter they tell riak "ayo, increment my counter by N". Riak actually stores a CRDT in a riak object, and this operation is just a regular PUT, with some custom merge logic on the vnode. If a user wants the current value of a counter they do a GET to the counter resource and get back a single integer value. This is actually just a normal riak GET, with the merge run by Riak. This makes sense with a counter.

Things get more complex with Sets. When removing an element from a Set, the context of the removal is the current state of the set. If there is no context it is possible that a vnode with no current state (think of a fallback spun up to handle a request) will not be able to remove an element.

Things are more complex still with Maps (see below.)

It is the unpredictability of shipping only Operations that has lead us to require a context for each operation.

It seems likely that a lot of work will be on optimising the size of the context.

Data Types

We plan to offer the following data types. Implementation details of the chosen type, and unsolved issues below.

Counter

We already support the PN-Counter.

Details

Much like a version vector. A pair of lists of two tuples {actor(), integer()}. The first list is the P Counter. It holds all the increments for the counter. The second list is the N Counter, it holds all the decrements. Each actor (in riak a vnode) only increments its own entry in the list. The value of the counter is the difference between the sum of the P Counter and the sum of the N Counter.

Issues

None of the sets in the literature behave as they would if their concurrent operations were linearized. Of all the sets in the catalog we feel the OR-Set is the most useful and intuitive.

Details

In the literature the OR-Set is made up of two sets of {element(), unique()}. The first set is the Add set, the second the Remove (or tombstone) set. However, for various reasons this proves space inefficient, especially if the same thing is added to the set often (more on that later.) Thanks to Riak being a distributed system with actors we came up with a more space efficient OR-Set, which I'm calling a VVORSet (naming is hard.)

The VVORSet is a dictionary of element() -> {active_bit(), vclock()} mappings. When an actor adds an element to the set it increments its entry in the clock for that element. When an actor removes an element from the set it flips the active_bit from 1 to 0. This is the observed remove (i.e. the actor removed all the updates for that element that it has observed, only.) Any concurrent add will result in a vlcock that is not strictly dominated by the remove clock for the element, and on merge, the elements active_bit will flip back to 1 and the clock will be the add clock.

There is actually a further small optimization for space where actor ids are stored only once in a list, and replaced by a single integer in the clocks. There is some computational expense at update and merge to look up actors, merge actor lists, and replace actors. See the module[3] for more details.

Issues

A single value register that converges on the value with the highest timestamp.

Details

Pretty simple. Either user supplies timestamp or the module generates one with:

make_micro_epoch() ->
    {Mega, Sec, Micro} = os:timestamp(),
    (Mega * 1000000 + Sec) * 1000000 + Micro.

Issues

These are one way flags. Enable flags start off and converge to on. Disable flags start on and converge to off.

Map

A Map of {name(), crdt_type()} -> crdt() CRDT that allows composition of complex documents made up of CRDTs. The restriction is that the type of each field must be declared. This avoids the need for a lattice of CRDT types, which we feel would lead to all Maps ultimately being Maps of Maps of Maps of Maps as we attempt to merge a name -> value || name -> value2 conflict into a single Map instance of both values (or an MVV, but why have siblings with CRDTs?)

We have a prototype (see below) that allows users to update / add / remove sub elements in the Map.

Details

The Map is implemented as a VVORSet of {name(), type()} pairs called the Key Set and a dictionary of {name(), type()} -> crdt() mappings called the Value List. name is anything stringish, and type is one of the supported CRDT modules.

We use a VVORSet for the keyset to allow concurrent update / remove of fields. The need for each update on a field to be recorded as an add to the key set so that it wins over a concurrent remove is what lead us to develop the VVORSet in the first place.

When a user wishes to add a field to the Map they can simply issue an update operation to that field. If the field is not present at the local replica an empty CRDT of type is created, the operation is applied, and the value is stored. A remove results in a VVORSet remove for the key, and we remove the value from the Value List at once. We received a draft paper from Baquero et al (after we made our map) that has a similar state based CRDT spec that we used to develop our EQC model. There is room for discussion about tombstoning values too, as there are some oddities in concurrent update | remove, but we think providing the context overcomes most of these. See below for more details.

Issues

Here is the current thinking on GC:

What is garbage?

As mentioned in the tour de types above, garbage is different for different CRDTs. In counters it is retired actors, in Sets it is tombstone elements. In Maps it may even include tombstone values.

In Sets garbage is very easy to identify (tombstones), in counters it is harder. How do we decide what is a retired actor? The garbage in all CRDTs is a result of the monotonic function, or at least constant inflation of CRDTs. It is this constant growth that gives them the merge properties that have. However, after all replicas have seen a concurrent add and remove the tombstone in a set is useless. So some garbage can safely be removed.

This garbage cannot be removed in an ad hoc, unilateral manner, as it will simply be re-introduced on a merge. In the case of Sets this would be wasteful, in the case of counters, it would lead to incorrect values.

Garbage then slows the system down over time. Reading a Set with many tombstones from disk, sending it over the network, is all wasteful. We would like to at least get the garbage out of the primary read-write path of an object.

A man, a plan, a (garbage) can

Some suggestions exist in the literature for garbage collection, like ORSWOTS[4] or Handoff Counters[2], but neither is a general purpose mechanism for multiple CRDT types (and [4] requires casual delivery.)

I'm going to focus on Sets for now, since (a) they have the most obvious garbage problem and (b) we can side step the problem of what is garbage (c) I doubt counters will accrue much garbage.

I think that we can't remove a tombstone element while there is a chance that a concurrent add of that element exists anywhere in the cluster. In order to be sure no such add exists we would need to check every node.

Equally, I think it would be a bad idea to require every node in the cluster to be available to collect garbage from a Set (but maybe that is fine.)

My current idea is to take advantage of Joseph Blomstedt's proposed strong consistency work to write an immutable log of garbage collection epochs per key. This means that any single replica can perform a unilateral garbage collection, and all other replicas can 'catch up' when they merge and detect that they're from a different epoch. Of course this just moves the garbage off the critical path and leaves us with the problem of truncating or trimming the log. We can use a full cluster gossip protocol to truncate the log as it can happen lazily in the background as nodes are available. Details of these three (GC log, catch up, GC log pruning) protocols to follow shortly.

Multi Data Centre

For GC to work in MDC clusters of clusters we think a cluster local / cluster remote view of CRDTs is required. This would require a pre-repl / post-repl hook of some sort that allows a CRDT to roll up its local representation prior to replication.

Since we have implemented an actor based OR-Set, and the counters are actor based, we think this is as simple as replacing all local actor IDs with a single ClusterId at roll up before repl, and dropping a clusters local ID from the state on repl receive.

This is way over simplified and one of the things I'm still figuring out.

API / User Interface

This is a thorny issue. It would be great to allow clients (like the Java Client) to create Objects that are based on Maps, with fields annotated with CRDT types and never have to deal with siblings ever.

We've started looking at some ways to represent the manipulation / querying of maps. Please comment on the examples:

https://github.com/metadave/gravy#update https://gist.github.com/seancribbs/1020fb807e65dfda8d97 https://gist.github.com/russelldb/3c9613d1067e0cd71736 (this one is how the prototype works, not really an option)


[1] CRDTs - A comprehensive study of Convergent and Commutative Replicated Data Type [2] Handoff Counters [3] VVORSet implementation - riak_dt_vvorset [4] ORSWOTS - An Optimized Conflict-free Replicated Set

russelldb commented 11 years ago

Copying initial comments from private repo:

From @metadave

@seancribbs and I are talking about a general query language for Riak, to be implemented in Erlang, that includes CRDT's: https://gist.github.com/metadave/6024103

From @lenary

I also have ideas for a CRDT Query/Update language: https://gist.github.com/lenary/da1b98a97b43a06ecf3e

However they need some work, and after discussion with @seancribbs it seems I might take them down a completely different track. :)

russelldb commented 11 years ago

There are many ways to make a CRDT map it seems. None of them are really satisfying to me (yet). But I need to pick one and move on. If you have the time, please help.

All the maps use something like an OR-Set for keys. All Maps map keys of {name(), crdt_type()} -> crdt(crdt_type()).

Original (Carlos Map): OR-Set for keys, dict of Key->Value

Upside: pretty small. Only keyset has tombstones. Downside: A(update >> remove) | B(update) loses A's update on merge. This is because the remove simply drops the value at A, so B keeps all of A's updates up-to when they diverged and re-introduces them to the value on merge. Add wins, but something is missing.

Remove Wins: dict of {name(), crdt_type()} -> {vclock(), active(), crdt(crdt_type(), tombstone() | undefined}

Upside: Semantic is pretty clear: when you remove an element we write a tombstone. On merge any updates concurrent with the remove are dropped. The field can be re-added. If it is re-removed update the tombstone clock. Downside: Long partition behaviour is confusing (imagine MDC write - write clusters) If a remove of a field happens on A that is concurrent with updates on B all B's updates are dropped on merge.

Add Wins: same as above except a list of tombstones, each a {clock(), crdt()} pair

Upside: no write is ever lost, ever, ever. Downside: fiddly, complex impl, lots of space overhead for tombstones, surprising behaviour in long partitioned replicas (i.e. if A(update >> remove >> update >> remove) | B(update >> remove >> update >> remove) field is present and merged value of all updates)

Reset-Remove: dict of {name(), crdt_type()} -> {vclock(), active(), crdt(crdt_type())} mappings

Upside: simple to understand semantic, appears most correct most often during partitioned state Downside: doesn't support every CRDT, (i.e. G-Counter, G-Set, One Way Flags are unsupported)

Works thusly: A remove first resets the crdt value (for a counter this means issue a decrement / increment that sets the counter to zero, for a set remove all elements, for a map reset/remove all fields), then it removes the key. The value is kept as a tombstone. If the user wants to re-add the field, it is as though it were an empty field, but any concurrent updates at other replicas maintain their writes, and the value they had for the removed replica are dominated by the reset operation.

Pick one!

kuenishi commented 11 years ago

Just got curious: How can we construct a combined CRDTs like {crdt_map(), crdt_map(), crdt_counter()} - which I expect each element of the tuple each CRDT data? Theoretically it looks like possible if CRDTs are not nested, like crdt_map(string(), crdt_counter()) .

This is because, from the viewpoint of application development ease, we rarely store single-valued object with only single counter included, but usually user model where multiple member included like this:

-record(user, {
               name :: string(),
               email :: string(),
               total_count_liked :: non_neg_integer(),
               friends :: [ string() ], %% usually friends' user names
               ... }).

In this case possible CRDT is applicable at total_count_liked and friends . Or is this much advanced problem to solve in future?

russelldb commented 11 years ago

@kuenishi Unless I misunderstand your question, this composition is what the Map is for. A Map is like a JSON document (for example.) Each field in the Map is a CRDT. So there may be Maps in Maps (in Maps).

Your model would be expressed as (assuming the Key in Riak is User):

{ {name, riak_dt_lwwreg} -> lwwreg,
   {email, riak_dt_lwwreg} -> lwwreg,
   {total_count_liked, riak_dt_pncounter} -> pncounter,
   {friends, riak_dt_orset} -> orset
}

and so on. The Map is used to compose CRDTs. We can only store CRDTs in the Map, the Map's field names include the Type of the thing stored, so Ints are pncounters, strings are Last Write Wins Registers, Lists of Friends are OR-Sets, and so on.

Does that answer your question?

kuenishi commented 11 years ago

Thank you, now I get it! That's what we have been wanted.

seancribbs commented 11 years ago

@kuenishi I might mention that for the first pre-release we have decided not to allow maps-in-maps via the public API (although they are technically possible).

radix commented 10 years ago

Hi, I'm just a curious observer learning about CRDTs, but I'm confused about something.

You mention that the literature describes OR-Set as two sets of pairs of (e, unique), but the INRIA paper actually describes it as only one set of pairs of (e, unique) (see Specification 15).

Is there another definition of OR-Set that I'm missing?

Pardon my annoying question...

lenary commented 10 years ago

@radeex our two sets are essentially 1) a set of (elem, token) for additions, and 2) a set of (elem, token) for removals (copies of the ones in the addition set if something is removed). We get the exact same semantics, without having to track when things were added and removed if we do it this way, and just unioning the set on a merge. This is the simplest implementation.

I'd also point out that Spec 15 isn't state-based OR-Set, which is what we implemented, it's an op-based OR-Set. We've avoided op-based CRDTs for various reasons, essentially feeling that state-based give a simpler model for us to program with underneath.

However, It turns out, @russelldb is a genius, and has worked out how to implement a set crdt with OR-Set semantics, that can track the causality of when something was added or removed enough that we only have a single tracking "set", and merges can work out how to do the right thing. It's called riak_dt_orswot, and is in the develop branch of the basho/riak_dt repo.

russelldb commented 10 years ago

@radeex the tombstoneless set is based on this work (http://arxiv.org/pdf/1210.3368.pdf ([4] above)), in fact we've been very lucky to have Carlos Baquero work with us on the Map and Set implementation.

The confusion about the OR-Set you mention is probably, as @lenary points out, the difference between a state and op based set. The paper at [1] says:

"We leave the corresponding state-based specification as an exercise for the reader. Since
  every add is effectively unique, a state-based implementation could be based on U-Set."

The easiest way to think of this is two U-Sets, one for adds, one for removes. And a naive implementation would do just that. In practice you can keep an unique ID per actor, and increment that per add (starting to look a lot like a version vector per element), rather than store all IDs, and use a single bit to denote if an element is present or not…and keep optimising until you end up with an ORSWOT.

radix commented 10 years ago

Okay, I was wondering if it was about the op-based vs state-based implementations. Your implementation sounds pretty clever. Thanks for the explanation!

maxsz commented 10 years ago

Hi, are you people aware of this work? http://run.unl.pt/bitstream/10362/7802/1/Sousa_2012.pdf

seancribbs commented 10 years ago

Yes.

Sean Cribbs

On Nov 29, 2013, at 8:40 AM, Max notifications@github.com wrote:

Hi, are you people aware of this work? http://run.unl.pt/bitstream/10362/7802/1/Sousa_2012.pdf

— Reply to this email directly or view it on GitHub.

russelldb commented 10 years ago

Hi @maxsz yes! I have read parts of Valter's dissertation (though not all yet). I've been corresponding with Valter, and Professor Nuno Preguiça (his supervisor), along with Carlos Baqueror and others from the CRDT research team.

We're actually involved in SyncFree, a new 3 year research project, and hope to bring useful discoveries into Riak in the future. If you want stay informed on that project there is a website here https://syncfree.lip6.fr/

In fact, Valter has been looking at CRDT invariants, and I believe he is using the pre5 riak release (with CRDTs) for his research at the moment. He's been on the Riak mailing list lately.

seancribbs commented 10 years ago

I think we've addressed the RFC bits of this for 2.0, closing. Reopen with comments if you disagree.