Closed sb8244 closed 5 years ago
Hi,
I can at least explain what is going on when you do this. We implement join-decomposition to limit back-propagation.
Back-propagation is the problem that you send a delta to a node that already has it, or in the worst case, that you keep sending the same deltas when all nodes already have them. To prevent this, we decompose every delta interval we receive into its irreducible parts, and only propagate farther the parts of the delta that are new.
The problem we are seeing here is that you are adding 50k deltas to the CRDT, and it is taking the remote CRDT some time to process them. After it has processed them, it will be able to respond exactly as quickly as the local CRDT.
I'm not sure what I could offer as a tip to be honest. I have tried to get this to be as performant as possible, and maybe there are some optimizations to be found, but the reality is that you can swamp DeltaCRDT, and it is perhaps not a viable solution for every problem.
Thanks for this information.
I'm naive here, so excuse my question: is it possible to combine deltas together once they are of a certain age? Would reducing the decomposition increase unnecessary messages but improve processing of the deltas?
I've made an attempt at improving join decomposition performance, could you test with the branch improve _performance_join_decomposition
and report back to me?
To answer your question: all deltas are combined into "delta intervals" (a delta interval and a delta have the same shape, conveniently), but we have to break this down to figure out which deltas we can send to our neighbours. If we don't do this, we will end up sending deltas unnecessarily, which is also inefficient (and only gets worse with more member nodes) since applying deltas is also expensive.
Hi.
Just noticed this issue. When I was implementing this for the first time I also had a performance issue in the join-decomposition computation - not sure if here we have the same problem, but maybe it can help anyways.
I'm assuming that join-decompositions are being used when a delta is received in order to prune redundant state from this delta, i.e. line 15 from Algorithm 2 in [1]:
and Δ is defined as:
The "Delta" function Δ is iterating the join-decomposition ⇓ of the received delta d
, and selecting which irreducibles are "new".
These irreducibles are later merged with the local state in function store
(line 17).
When implementing this receive
handler, I ended-up implementing a delta_and_merge
function, that takes as input 1) the received delta d
and 2) the local state, and outputs both 1) the output of Δ (so that this output is added to the delta-buffer) and 2) the updated local state.
For example, if we have a LWWMap
, the delta_and_merge
function will look something like this:
%% @doc Merge two LWWMap and return the delta responsible for the inflation.
-spec delta_and_merge(state_lwwmap(), state_lwwmap()) -> {state_lwwmap(), state_lwwmap()}.
delta_and_merge({?TYPE, Remote}, {?TYPE, Local}) ->
{Delta, CRDT} = maps:fold(
fun(RKey, {RTS, _}=RValue, {DeltaAcc, CRDTAcc}=Acc) ->
%% inflation: when key is there with smaller ts
%% and when it's not
Inflation = case maps:find(RKey, CRDTAcc) of
{ok, {LTS, _}} -> RTS > LTS;
error -> true
end,
case Inflation of
true -> {maps:put(RKey, RValue, DeltaAcc), maps:put(RKey, RValue, CRDTAcc)};
false -> Acc
end
end,
{#{}, Local},
Remote
),
{{?TYPE, Delta}, {?TYPE, CRDT}}.
Here we're iterating the received delta (basically iterating its join-decomposition) and checking if each irreducible inflates the local state. We have an inflation when:
In the end, if the output of Δ is different from bottom, we add it to the delta-buffer, with something like this:
-spec message_handler(message(), ldb_node_id(), stored(), st()) ->
{stored(), nothing | message()}.
message_handler({delta, N, {Type, _}=Remote}, From,
{LocalCRDT0, DeltaBuffer0, AckMap}, _) ->
%% compute delta and merge
{Delta, LocalCRDT} = Type:delta_and_merge(Remote, LocalCRDT0),
%% add to buffer
DeltaBuffer = case Type:is_bottom(Delta) of
true -> DeltaBuffer0;
false -> ldb_dbuffer:add_inflation(Delta, From, DeltaBuffer0)
end,
Stored = {LocalCRDT, DeltaBuffer, AckMap},
%% send ack
Reply = {
delta_ack,
N
},
{Stored, Reply};
The full implementation can be found here.
Hope this helps, and let me know if there's any question.
If we strictly follow the specification in the algorithm, we can have something like:
In the strategy proposed above we kind of perform the first three steps in a single iteration of the received delta.
Hi @vitorenesduarte,
As always, your feedback is appreciated!
It looks like the LWWMap here doesn't have observed removes(?). And it looks like the delta_and_merge
function accepts an already-decomposed delta(?). Have these deltas never been joined? (And what do you do then when a node sends its state because it's missing some deltas that another node needs?)
I think I do have another performance issue in the part where we check whether a remove operation is an expansion, because we have to (or do, in any case) check every key to see if it has the dot we are removing. We also can't do any tricks there with checking ordering, unfortunately (but I can still bend the algorithmic complexity of the dot search from O(n) to O(1), but it'll be ugly).
Hi @derekkraan!
Thanks! I was really busy with deadlines until last week, but I'll try to be of more help from now on.
It looks like the LWWMap here doesn't have observed removes(?)
Yes, this is very simple LWWMap (grow-only) and does not support removes. The internal state is a map from key to a tuple containing the timestamp and the value. I gave that as an example just to demonstrate the technique I ended-up implementing (compute Δ and merge in a single pass), as opposed to a more direct interpretation of the what's in paper.
And it looks like the delta_and_merge function accepts an already-decomposed delta(?) Have these deltas never been joined?
It accepts the received delta-group (the join of all deltas in the delta-buffer) as first argument, but because the data type is really simple, iterating the received delta-group looks like iterating its decomposition.
(And what do you do then when a node sends its state because it's missing some deltas that another node needs?)
Is this related with #13?
Now, if we have a more complex CRDT, like an AWSet, maybe we need to distinguish a delta from the CRDT state. In abstract they are the same, but in order to have an efficient implementation, we may need to have different encodings. As we have discussed previously, in abstract the delta resulting from a remove is simply the active dots, but an efficient implementation may want to also save the element in the delta to enable a fast lookup when joining.
I'll try to write something on that.
Hi @vitorenesduarte,
Thanks for the quick response.
an efficient implementation may want to also save the element in the delta to enable a fast lookup when joining.
I have already implemented this, to enable efficient joins. I track the keys in the DotMap (it's a little bit ugly but :man_shrugging:).
The problem is that if there is a net split, the neighbours think a node has gone away, so they forget about this node. In the meantime, new deltas are added. When the node rejoins, it is missing deltas, so it gets the whole state sent to it, but due to state causal context compression, any removes that have happened in this period are lost, breaking our CRDT.
I think the solution will be detection that a node has missed some delta's, and forcing that node to push any remaining deltas it has and start over from fresh (then it will initialize its state properly with the whole state fallback of all neighbours).
If you have any thoughts on this then a comment on #13 would be much appreciated! Aside from performance issues (which are annoying), this is the only area where I'm aware that this library has a real deficiency.
I was suggesting the same story but to implement an efficient delta_and_merge
method.
For an AWSet, we would generate as deltas { e: {dot: TAG} }
, and this TAG could be (for example):
False
for additionsTrue
for removalsWith this, I believe it should be possible to have a single-pass handler of remote deltas.
Here's how it could look in Python:
#!/usr/bin/env python3
class AWSet:
ADD_TAG = False
RMV_TAG = True
def __init__(self):
"""
An AWSet has:
- a map from element to the set of active dots
- a causal context
We will represent this causal context as a map from node identifier to
the set of all sequences, but in practice we want a have a more
efficient representation of this data structure.
For example, given two nodes, a and b, if we have as causal context:
{
a: [1, 2, 3],
b: [1, 2, 4, 5]
}
this could be represented as:
{
a: (3, []),
b: (2, [4, 5])
}
Each node identifier is mapped to a pair where the first component is
the highest sequence seen, and the second component is the remaining
sequences that are not summarized by the left component.
"""
self.elems = {}
self.cc = {}
def __repr__(self):
return "Elems: " + str(self.elems) + " | CC: " + str(self.cc)
def next_dot(self, i):
"""
Return the next dot, and also add it to the causal context.
"""
# compute the next sequence and store it in the causal context
if i in self.cc:
next_seq = max(self.cc[i]) + 1
self.cc[i].add(next_seq)
else:
next_seq = 1
self.cc[i] = set([next_seq])
# compute dot
dot = (i, next_seq)
return dot
def add_dot(self, dot):
"""
Add this dot to the causal context.
"""
j, seq = dot
if j in self.cc:
self.cc[j].add(seq)
else:
self.cc[j] = set([seq])
def add(self, i, e):
"""
Add a new element to the set.
This method returns a delta, and incorporates this update in the AWSet.
"""
# compute dot
dot = self.next_dot(i)
# retrieve active dots
active_dots = self.elems.get(e, set())
# update elems
self.elems[e] = set([dot])
# the delta is a map from element to a map with:
# - the new dot tagged as ADD
# - each active dot tagged as RMV
tagged_dots = { d : AWSet.RMV_TAG for d in active_dots }
tagged_dots[dot] = AWSet.ADD_TAG
delta = { e: tagged_dots }
return delta
def remove(self, i, e):
"""
Remove an element from the set.
This method returns a delta, and incorporates this update in the AWSet.
"""
# remove elem from map while retrieving the active dots
active_dots = self.elems.pop(e, set())
# the delta is a map from element to a map with:
# - each active dot tagged as RMV
tagged_dots = { d : AWSet.RMV_TAG for d in active_dots }
delta = { e: tagged_dots }
return delta
def __merge_tags(tag_a, tag_b):
"""
Return the logical or of tags.
"""
return tag_a or tag_b
def __merge_dots_into(dots_a, dots_b):
"""
This method merges dots_a into (possibly) bigger dots_b.
"""
# for each dot
for dot, tag_a in dots_a.items():
# get tags
# - by default ADD
tag_b = dots_b.get(dot, AWSet.ADD_TAG)
# merge tags
tag = AWSet.__merge_tags(tag_a, tag_b)
# store tag
dots_b[dot] = tag
return dots_b
def __merge_delta_into(delta_a, delta_b):
"""
This method merges delta_a into (possibly) bigger delta_b.
"""
# for each element
for e, dots_a in delta_a.items():
# get dots
# - by default empty
dots_b = delta_b.get(e, {})
# merge dots
dots = AWSet.__merge_dots_into(dots_a, dots_b)
# store dots
delta_b[e] = dots
return delta_b
def merge_deltas(*deltas):
"""
This method returns the merge of all deltas in a list.
"""
# if no deltas, return the empty delta
if len(deltas) == 0:
return {}
# initial result is the first delta
result = deltas[0]
# for each remaining delta
for delta in deltas[1:]:
# merge it into result
result = AWSet.__merge_delta_into(delta, result)
return result
def __is_addition(tag):
"""
We have an addition when the tag is False.
"""
return not tag
def delta_and_merge(self, delta):
"""
Merge received delta group into AWSet, and return the state responsible
for the inflation.
"""
# the state responsible for the inflation
inflation_state = {}
# for each element and dot in the received delta
for e, dots in delta.items():
if e in self.elems:
# if element already exists
active_dots = self.elems.get(e)
tagged_dots = {}
for dot, tag in dots.items():
# add dot to causal context
self.add_dot(dot)
# we have an inflation if the dot is tagged with:
# - ADD and we the dot is unknown
# - RMV and we the dot is known
is_addition = AWSet.__is_addition(tag)
is_known = dot in active_dots
if is_addition:
if not is_known:
# add dot
active_dots.add(dot)
# save into inflation state
tagged_dots[dot] = tag
else:
if is_known:
# remove dot
active_dots.remove(dot)
# save into inflation state
tagged_dots[dot] = tag
# either update dots or remove element if all elements were removed
if len(active_dots) > 0:
self.elems[e] = active_dots
else:
self.elems.pop(e)
# maybe store tagged dots
if len(tagged_dots) > 0:
inflation_state[e] = tagged_dots
else:
# if element doesn't exist
active_dots = set([])
tagged_dots = {}
for dot, tag in dots.items():
# add dot to causal context
self.add_dot(dot)
# we have an inflation if the dot is tagged with ADD
is_inflation = AWSet.__is_addition(tag)
if is_inflation:
# add new dot
active_dots.add(dot)
# save into inflation state
tagged_dots[dot] = tag
# update dots
if len(active_dots) > 0:
self.elems[e] = active_dots
# maybe store tagged dots
if len(tagged_dots) > 0:
inflation_state[e] = tagged_dots
return inflation_state
def main():
# two nodes, A and B
set_a = AWSet()
set_b = AWSet()
print("> A adds hello")
delta_a1 = set_a.add("A", "hello")
print("A:", set_a)
print("delta:", delta_a1)
print()
print("> A adds hello")
delta_a2 = set_a.add("A", "hello")
print("A:", set_a)
print("delta:", delta_a2)
print()
print("> A adds foo")
delta_a3 = set_a.add("A", "foo")
print("A:", set_a)
print("delta:", delta_a3)
print()
print("> B adds hello")
delta_b1 = set_b.add("B", "hello")
print("> B adds and removes bar")
delta_b2 = set_b.add("B", "bar")
delta_b3 = set_b.remove("B", "bar")
print("B:", set_b)
print()
print("> merge all deltas from A")
deltas_a = AWSet.merge_deltas(delta_a1, delta_a2, delta_a3)
print("delta_A:", deltas_a)
print()
print("> merge all deltas from B")
deltas_b = AWSet.merge_deltas(delta_b1, delta_b2, delta_b3)
print("delta_B:", deltas_b)
print()
print("> merge delta_A into A")
inflation_state = set_a.delta_and_merge(deltas_a)
print("new:", inflation_state)
print("A:", set_a)
print()
print("> merge delta_B into A")
inflation_state = set_a.delta_and_merge(deltas_b)
print("new:", inflation_state)
print("A:", set_a)
print()
print("> merge delta_B into A again")
inflation_state = set_a.delta_and_merge(deltas_b)
print("new:", inflation_state)
print("A:", set_a)
print()
print("> merge delta_B into B")
inflation_state = set_b.delta_and_merge(deltas_b)
print("new:", inflation_state)
print("B:", set_b)
print()
print("> merge delta_A into B")
inflation_state = set_b.delta_and_merge(deltas_a)
print("new:", inflation_state)
print("B:", set_b)
print()
print("> merge delta_a into B again")
inflation_state = set_b.delta_and_merge(deltas_a)
print("new:", inflation_state)
print("B:", set_b)
print()
if __name__== "__main__":
main()
This outputs:
> A adds hello
A: Elems: {'hello': {('A', 1)}} | CC: {'A': {1}}
delta: {'hello': {('A', 1): False}}
> A adds hello
A: Elems: {'hello': {('A', 2)}} | CC: {'A': {1, 2}}
delta: {'hello': {('A', 1): True, ('A', 2): False}}
> A adds foo
A: Elems: {'hello': {('A', 2)}, 'foo': {('A', 3)}} | CC: {'A': {1, 2, 3}}
delta: {'foo': {('A', 3): False}}
> B adds hello
> B adds and removes bar
B: Elems: {'hello': {('B', 1)}} | CC: {'B': {1, 2}}
> merge all deltas from A
delta_A: {'hello': {('A', 1): True, ('A', 2): False}, 'foo': {('A', 3): False}}
> merge all deltas from B
delta_B: {'hello': {('B', 1): False}, 'bar': {('B', 2): True}}
> merge delta_A into A
new: {}
A: Elems: {'hello': {('A', 2)}, 'foo': {('A', 3)}} | CC: {'A': {1, 2, 3}}
> merge delta_B into A
new: {'hello': {('B', 1): False}}
A: Elems: {'hello': {('B', 1), ('A', 2)}, 'foo': {('A', 3)}} | CC: {'A': {1, 2, 3}, 'B': {1, 2}}
> merge delta_B into A again
new: {}
A: Elems: {'hello': {('B', 1), ('A', 2)}, 'foo': {('A', 3)}} | CC: {'A': {1, 2, 3}, 'B': {1, 2}}
> merge delta_B into B
new: {}
B: Elems: {'hello': {('B', 1)}} | CC: {'B': {1, 2}}
> merge delta_A into B
new: {'hello': {('A', 2): False}, 'foo': {('A', 3): False}}
B: Elems: {'hello': {('B', 1), ('A', 2)}, 'foo': {('A', 3)}} | CC: {'B': {1, 2}, 'A': {1, 2, 3}}
> merge delta_a into B again
new: {}
B: Elems: {'hello': {('B', 1), ('A', 2)}, 'foo': {('A', 3)}} | CC: {'B': {1, 2}, 'A': {1, 2, 3}}
Hey @derekkraan I'm testing out that branch.
My initial thought is that the add speed is much slower in the performance branch. Where it used to take ~3s to add 50k entries, I am about 10k entries in @ 30 seconds now.
Hi @sb8244,
I will take another look at it. I am starting to get the feeling that these optimizations are leaking through the implementation layers and it might be better to overhaul to have more code be specific to the final CRDT I'm building. So I'm not sure if there will be a quick fix, but I'm looking into it / working on it.
Derek
Thanks for your time on this @derekkraan. I don't think I can help with the CRDT much, but I'm happy to help with a small performance suite if necessary. I found another solution for the time being since I don't really need the guarantees of a CRDT for my problem.
I've closed #15 in favour of #16. It should be much improved. I've also added a small benchmark that I've been using in various forms to compare performance of this branch to master
, but if you had some time / wanted to a small performance suite then I would definitely appreciate that. I'll probably look at this myself more in depth in the coming days but if you had ideas then I would definitely like to hear them.
I've just merged a major overhaul that should improve the performance quite a bit (especially where removed are concerned). I'm closing this issue (but please open a new issue if you find any other performance issues or have ideas about how to optimize further).
Thanks for this library, super cool. I'm working towards using it and testing it out with some example scenarios.
I have the basic working by using a node monitor to handle connect/disconnect between the cluster. This means that a joined node will automatically get the AWLWW Map.
I'm doing code that looks like this:
The read is instant on node 1, but times out after 30s on node 2. It will eventually catch up, but may take 1-2 minutes. It seems to be spending most of its time in join decomposition
Do you have any tips for this?