phoenixframework / phoenix_pubsub

Distributed PubSub and Presence platform for the Phoenix Framework
http://www.phoenixframework.org/
MIT License
653 stars 124 forks source link

Race condition causing data inconsistency when nodes are coming up #148

Open indrekj opened 4 years ago

indrekj commented 4 years ago

I think I found a race condition that is causing invalid data.

Version: latest master (7893228).

Some background: We use a slightly modified version of phoenix_pubsub which has some performance optimizations (one PR up here as well, + we added tag lookup ets table to speed up delta merges). We encountered a shard crash that seemed to happen because of our modifications. After I was able to create a failing test, I also noticed that the bug exists also on the original branch (though, in the original branch, it does not cause a shard crash but data inconsistency instead).

For us, this seems to happen when there's a network partition or Kubernetes thinks it's a good idea to move/add some pods around.

This is really hard to replicate in the real world. It usually happens for us maybe once a month.

Scenario (same as in the test but in the words):

Failing test:

test "delta before transfer from a different node", config do
  a = new(:a, config)
  b = new(:b, config)
  {a, _, _} = State.replica_up(a, b.replica)
  {b, _, _} = State.replica_up(b, a.replica)

  alice = new_pid()

  # Alice joins Node A
  a = State.join(a, alice, "lobby", :alice, "initial")

  # Node A sends updates to node B
  assert {b, [{{_, _, :alice}, _, _}], _} = State.merge(b, State.extract(a, b.replica, b.context))
  assert [:alice] = b |> State.online_list() |> keys()
  a = State.reset_delta(a)

  # Alice is updated first time
  a = State.leave(a, alice, "lobby", :alice)
  a = State.join(a, alice, "lobby", :alice, "update1")

  # update1 is not received by Node B (because of network delay or network
  # partition) or is received a lot later
  a = State.reset_delta(a)

  # Node C comes up
  c = new(:c, config)
  {b, _, _} = State.replica_up(b, c.replica)
  {a, _, _} = State.replica_up(a, c.replica)
  {c, _, _} = State.replica_up(c, a.replica)
  {c, _, _} = State.replica_up(c, b.replica)

  # Alice is updated second time
  a = State.leave(a, alice, "lobby", :alice)
  a = State.join(a, alice, "lobby", :alice, "second")

  # Lets assume Node C also sent out transfer_req to Node B here, but Node C
  # receives delta heartbeat from Node A first.
  assert {c, [{{_, _, :alice}, "second", _}], []} = State.merge(c, a.delta)

  # Here everything is fine. Node C sees the latest alice.
  assert [
    {{"lobby", _, :alice}, "second", _}
  ] = c |> State.online_list()

  # Now Node C receives transfer ack from B (who has alice with one missed update)
  assert {c, _, _} = State.merge(c, State.extract(b, c.replica, c.context))
  assert [
    {{"lobby", _, :alice}, "second", {{:a, 1}, 2}}
  ] = c |> State.online_list()
  # ^ This fails because the most recent alice is overwritten with the old
  # alice (who has "initial" now in the meta")

  # Lets say we ignore the previous inconsistency and wait for transfer ack
  # from the node A as well
  assert {c, _, _} = State.merge(c, State.extract(a, c.replica, c.context))
  assert [
    {{"lobby", _, :alice}, "second", _}
  ] = c |> State.online_list()
  # ^ This still fails - now there is no alice online at all
end

Also link: https://github.com/salemove/phoenix_pubsub/commit/fdfe57c

Note: As this is quite complex to replicate in the real world, I cannot be 100% sure that my test is exactly what is happening. I'm fairly certain there's "values" overwriting happening because I was able to change this line to use true = :ets.insert_new and this threw an error when there were new pods coming up (it took 2 weeks to catch that though).

In case my assumptions and the test case are correct - I still don't have a good idea how to fix it...