derekkraan / delta_crdt_ex

Use DeltaCrdt to build distributed applications in Elixir
MIT License
493 stars 36 forks source link

delta_crdt appears to lose data when running on more than one node. #65

Open bokner opened 2 years ago

bokner commented 2 years ago

The following module simulates the scenario with several concurrent processes writing to CRDT at some constant rate. It looks like when running the script on 2 or more nodes with the interval between writes close to sync_interval, some keys will often be lost.

defmodule DeltaCrdt.Test do
  @crdt_test :crdt_test

  def init(sync_interval \\ 200) do
    {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: sync_interval, name: @crdt_test)

    :net_adm.world()
    |> Enum.reject(fn node -> node == Node.self() end)
    |> set_neighbours(crdt)
  end

  def run(opts \\ []) do
    write_to_crdt(
      Keyword.get(opts, :processes, 10),
      Keyword.get(opts, :requests_per_process, 1000),
      Keyword.get(opts, :interval, 200)
      )
  end

  def crdt_length() do
    length(Map.keys(DeltaCrdt.to_map(:crdt_test)))
  end

  def stop() do
    Process.exit(:erlang.whereis(:crdt_test), :normal)
  end

  defp set_neighbours(nodes, crdt) do
    DeltaCrdt.set_neighbours(crdt,
      Enum.map(nodes, fn node -> {@crdt_test, node} end)
    )
  end

  ## 'process_num' processes write `requests_per_process` times to CRDT every 'interval' milliseconds
  defp write_to_crdt(process_num, requests_per_process, interval) do
    Enum.each(1..process_num,
      fn _process_id -> Task.async(fn -> write(requests_per_process, interval) end)
  end)
  end

  defp write(requests_per_process, interval) do
    Enum.each(1..requests_per_process,
    fn _request_id ->
      DeltaCrdt.put(@crdt_test,
        :erlang.make_ref(), DateTime.to_unix(DateTime.utc_now(), :millisecond))
      :timer.sleep(interval)
    end)
  end
end

Steps to reproduce the issue (you might need to run several times):

  1. Open 2 IEx sessions in separate terminals.
    iex --sname node1 --cookie delta_crdt -S mix
iex --sname node2 --cookie delta_crdt -S mix
  1. In both IEx sessions:
    import DeltaCrdt.Test
    ## This will initialize crdt on the node with 'sync_interval = 200' and then run 10 concurrent processes, 
    ## each writing 1000 times, with 100 msecs between writes.
    init(200); run(interval: 100)
  2. Wait for couple minutes and then check the number of records in crdt:
    crdt_length

    Expected value is 20000 (that is, 10 1000 2), but you'll likely see lesser number, which indicates some records were lost.

You will likely get 20000 if running with much larger interval, i.e.

run(interval: 500) 

, but it gets worse for 3 and more nodes, that is, even much larger interval does not prevent an occasional loss of records.

Note: If you want to run multiple tests within the same IEx session, you can kill current crdt by running:

stop
derekkraan commented 2 years ago

Hi @bokner, I tried running this with 3 nodes a few times and got 30k keys in the CRDT. Also tried with 2 nodes and got 20k. Am I missing something?

bokner commented 2 years ago

Thanks @derekkraan, I'm still able to consistently reproduce it.

I changed the code and steps to reproduce a bit to make it easier to run:

defmodule DeltaCrdt.Test do
  @crdt_test :crdt_test

  require Logger
  def init(sync_interval \\ 200) do
    {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: sync_interval, name: @crdt_test)

    :net_adm.world()
    |> Enum.reject(fn node -> node == Node.self() end)
    |> set_neighbours(crdt)
  end

  def run(opts \\ []) do
    init(Keyword.get(opts, :sync_interval, 200))
    write_to_crdt(
      Keyword.get(opts, :processes, 10),
      Keyword.get(opts, :requests_per_process, 1000),
      Keyword.get(opts, :write_interval, 200)
      )
    Logger.info("Records: #{crdt_length()}")
  end

  def crdt_length() do
    length(Map.keys(DeltaCrdt.to_map(:crdt_test)))
  end

  def stop() do
    Process.exit(:erlang.whereis(:crdt_test), :normal)
  end

  defp set_neighbours(nodes, crdt) do
    DeltaCrdt.set_neighbours(crdt,
      Enum.map(nodes, fn node -> {@crdt_test, node} end)
    )
  end

  ## 'process_num' processes write `requests_per_process` times to CRDT every 'write_interval' milliseconds
  defp write_to_crdt(process_num, requests_per_process, write_interval) do
    Task.async_stream(1..process_num, fn _i -> write(requests_per_process, write_interval) end,
      max_concurrency: process_num,
      timeout: :infinity) |> Enum.to_list()
  end

  defp write(requests_per_process, write_interval) do
    Enum.each(1..requests_per_process,
    fn _request_id ->
      DeltaCrdt.put(@crdt_test,
        :erlang.make_ref(), DateTime.to_unix(DateTime.utc_now(), :millisecond))
      :timer.sleep(write_interval)
    end)
  end
end

Steps to reproduce the issue (you might need to run several times):

  1. Open 2 IEx sessions in separate terminals.
    iex --sname node1 --cookie delta_crdt -S mix
iex --sname node2 --cookie delta_crdt -S mix
  1. In both IEx sessions, within some small interval between calls:

This will initialize crdt on the node with 'sync_interval = 200' and then run 10 concurrent processes,

each writing 1000 times, with 100 msecs between writes.

DeltaCrdt.Test.run(write_interval: 100, sync_interval: 200)

Note: this is now a blocking call, so it'll take some time depending on value of `write_interval` or how many processes you want to run (default is 10).

3. After the simulation is done on each node, it will output the number of records in crdt.
Wait until all nodes finish, then call:
```elixir
 DeltaCrdt.Test.crdt_length

on each node.

bokner commented 2 years ago

I was wondering if the Erlang/Elixir version and/or VM args could make a difference. Mine are:

Erlang/OTP 24 [erts-12.0.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit] [dtrace]
Interactive Elixir (1.13.1) - press Ctrl+C to exit (type h() ENTER for help)
derekkraan commented 2 years ago

Hi thanks again, I was able to reproduce this time. I think I know what is happening as well.

In the CRDT, we maintain a list of "dots", basically tracking who has seen what. This is simplified to just track the last seen dot. However, we are also sending partial diffs. So this means that sometimes we might send an update saying "I know of all updates up til X, and that doesn't include key Y". The receiving process thinks "ok, he knows I added Y, and he has also seen all my updates including the addition of Y, and he is saying that it's not there anymore, it must be gone" and deletes Y.

Not sure yet on the solution. I will have to stew on this one probably.

bokner commented 2 years ago

Thanks @derekkraan ! You can count on me to test all the changes that might fix this issue 😄

hubertlepicki commented 2 years ago

@derekkraan could this affect Horde.Registry in a way that multiple nodes could register processes under the same key, and on occasion it wouldn't detect the conflict?

derekkraan commented 2 years ago

@hubertlepicki I don't believe so. The CRDT still converges, but some data might be missing. If you think it is though, might be worth investigating. I don't have a solution to this issue in mind yet.

bokner commented 2 years ago

I've done a lot of testing with sufficiently large max_sync_size and/or max_sync_size set to :infinite. Never once I've lost any data. I have a hunch that the data loss is due to truncation of diffs in merkle_map code. If I read the code correctly, the truncation happens when the size of diffs exceeds max_sync_size.

derekkraan commented 2 years ago

@bokner this is also the direction I was thinking. I am still not sure how to mitigate, perhaps "just send everything every time" is an option. Another possible option is to send only the "dots" that relate to the keys being sent in an update. Not sure how to calculate that though, and the fix for this issue has made it to the back burner unfortunately.

derekkraan commented 2 years ago

The issue is: we send a partial update, but we send all "dots" (which is like a record of what we have seen) with that update. Then we get the dots back from the remote node, but without any keys, leading the first node to imagine that the second node has removed the keys. Meanwhile the keys have never been transmitted. So the solution is likely to make sure that we send only dots that are relevant to the update that is being sent. I am not sure how computationally intensive this is. I haven't given it a shot yet.