bitwalker / swarm

Easy clustering, registration, and distribution of worker processes for Erlang/Elixir
MIT License
1.19k stars 102 forks source link

No synchronization between my nodes #65

Closed FabienHenon closed 6 years ago

FabienHenon commented 6 years ago

Hi,

First, I'd like to thank you for this had work :)

I tried to used Swarm for my own project. I'm starting processes from my supervisor and I'd like them to be shared between connected nodes.

This is the code I use to start new workers in the swarm:

defmodule ModNotifications.Workers.Sender do
  # Automatically defines child_spec/1
  use DynamicSupervisor

  def start_link(arg) do
    DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(_arg) do
    DynamicSupervisor.init(strategy: :one_for_one, max_children: :infinity)
  end

  def schedule(delay, sender) do
    name = UUID.uuid4()

    {:ok, pid} =
      Swarm.register_name(name, __MODULE__, :register, [
        name,
        {ModNotifications.Workers.Delayer, %{delay: delay, sender: sender}}
      ])

    Swarm.join(:notification_sender, pid)
  end

  def register(name, {mod, data}) do
    DynamicSupervisor.start_child(
      __MODULE__,
      {mod, {name, data}}
    )
  end
end

This is just a DynamicSupervisor started at app startup. When I need a new worker I call schedule function with parameters, which then starts a new worker.

The code for the worker is pretty simple:

defmodule ModNotifications.Workers.Delayer do
  use GenServer, restart: :temporary

  alias ModNotifications.Workers.Sender

  require Logger

  def start_link({name, state}) do
    Logger.warn("START LINK #{inspect(name)} #{inspect(state)}")
    GenServer.start_link(__MODULE__, state)
  end

  def init(%{sender: _, delay: delay_time} = state) do
    Logger.warn("INIT DELAYER #{inspect(state)}")
    delay(delay_time)

    {:ok, state}
  end

  def init(_), do: :ignore

  def handle_call({:swarm, :begin_handoff}, _from, state) do
    Logger.warn("BEGIN_HANDOFF #{inspect(state)}")
    {:reply, {:resume, state}, state}
  end

  # called after the process has been restarted on its new node,
  # and the old process' state is being handed off. This is only
  # sent if the return to `begin_handoff` was `{:resume, state}`.
  # **NOTE**: This is called *after* the process is successfully started,
  # so make sure to design your processes around this caveat if you
  # wish to hand off state like this.
  def handle_cast({:swarm, :end_handoff, state}, old_state) do
    Logger.warn("END_HANDOFF #{inspect(state)}, old state : #{inspect(old_state)}")
    {:noreply, state}
  end

  # called when a network split is healed and the local process
  # should continue running, but a duplicate process on the other
  # side of the split is handing off its state to us. You can choose
  # to ignore the handoff state, or apply your own conflict resolution
  # strategy
  def handle_cast({:swarm, :resolve_conflict, resolve_state}, state) do
    Logger.warn("RESOLVE_CONFLICT #{inspect(resolve_state)}, state : #{inspect(state)}")
    {:noreply, state}
  end

  # this message is sent when this process should die
  # because it is being moved, use this as an opportunity
  # to clean up
  def handle_info({:swarm, :die}, state) do
    Logger.warn("DIE #{inspect(state)}")
    {:stop, :shutdown, state}
  end

  def handle_info(:send, %{sender: sender} = state) do
    NotificationCenter.send(sender)

    {:stop, :normal, state}
  end

  def handle_info(_message, state), do: {:noreply, state}

  def delay(delay) do
    # in minutes
    Process.send_after(self(), :send, delay * 60 * 1000)
  end
end

The worker does only one think: It calls a send function after a given amount of time.

Everything works fine with a single node. However, when I start a new node and connect it to the first one, I can't find my workers from the second node:

Swarm.members(:notification_sender) == []

Here are the logs from my first node (the one from which I started the workers):

*DBG* 'Elixir.Swarm.Tracker' receive info {nodeup,'node2@192.168.1.21',[{node_type,visible}]} in state tracking
[info] [swarm on node1@192.168.1.21] [tracker:ensure_swarm_started_on_remote_node] nodeup node2@192.168.1.21
*DBG* 'Elixir.Swarm.Tracker' consume info {nodeup,'node2@192.168.1.21',[{node_type,visible}]} in state tracking
[info] [swarm on node1@192.168.1.21] [tracker:cluster_wait] joining cluster..
[info] [swarm on node1@192.168.1.21] [tracker:cluster_wait] found connected nodes: [:"node2@192.168.1.21"]
[info] [swarm on node1@192.168.1.21] [tracker:cluster_wait] selected sync node: node2@192.168.1.21
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync,<27946.629.0>} in state syncing
[info] [swarm on node1@192.168.1.21] [tracker:syncing] there is a tie between syncing nodes, breaking with die roll (5)..
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync,<27946.629.0>} in state syncing
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_begin_tiebreaker,<27946.629.0>,1} in state syncing
[info] [swarm on node1@192.168.1.21] [tracker:syncing] there is a tie between syncing nodes, breaking with die roll (13)..
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_begin_tiebreaker,<27946.629.0>,1} in state syncing
[info] [swarm on node1@192.168.1.21] [tracker:syncing] we won the die roll (13 vs 1), sending registry..
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_end_tiebreaker,<27946.629.0>,5,19} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' postpone cast {sync_end_tiebreaker,<27946.629.0>,5,19} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_recv,<27946.629.0>,{{0,1},0},[]} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_recv,<27946.629.0>,{{0,1},0},[]} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_ack,'node2@192.168.1.21'} in state awaiting_sync_ack
[info] [swarm on node1@192.168.1.21] [tracker:awaiting_sync_ack] received sync acknowledgement from node2@192.168.1.21
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_ack,'node2@192.168.1.21'} in state awaiting_sync_ack
[info] [swarm on node1@192.168.1.21] [tracker:resolve_pending_sync_requests] pending sync requests cleared
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_end_tiebreaker,<27946.629.0>,5,19} in state tracking
[warn] [swarm on node1@192.168.1.21] [tracker:handle_cast] unrecognized cast: {:sync_end_tiebreaker, #PID<27946.629.0>, 5, 19}

iex(node1@192.168.1.21)2> Swarm.members(:notification_sender)
[#PID<0.888.0>, #PID<0.901.0>]

Here are the logs from the second node:

iex(node2@192.168.1.21)2> Node.connect(:"node1@192.168.1.21")
*DBG* 'Elixir.Swarm.Tracker' receive info {nodeup,'node1@192.168.1.21',[{node_type,visible}]} in state tracking
true
iex(node2@192.168.1.21)3> [info] [swarm on node2@192.168.1.21] [tracker:ensure_swarm_started_on_remote_node] nodeup node1@192.168.1.21
[info] [swarm on node2@192.168.1.21] [tracker:cluster_wait] joining cluster..
*DBG* 'Elixir.Swarm.Tracker' consume info {nodeup,'node1@192.168.1.21',[{node_type,visible}]} in state tracking
[info] [swarm on node2@192.168.1.21] [tracker:cluster_wait] found connected nodes: [:"node1@192.168.1.21"]
[info] [swarm on node2@192.168.1.21] [tracker:cluster_wait] selected sync node: node1@192.168.1.21
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync,<29111.620.0>} in state syncing
[info] [swarm on node2@192.168.1.21] [tracker:syncing] there is a tie between syncing nodes, breaking with die roll (1)..
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync,<29111.620.0>} in state syncing
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_begin_tiebreaker,<29111.620.0>,5} in state syncing
[info] [swarm on node2@192.168.1.21] [tracker:syncing] there is a tie between syncing nodes, breaking with die roll (19)..
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_begin_tiebreaker,<29111.620.0>,5} in state syncing
[info] [swarm on node2@192.168.1.21] [tracker:syncing] we won the die roll (19 vs 5), sending registry..
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_end_tiebreaker,<29111.620.0>,1,13} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' postpone cast {sync_end_tiebreaker,<29111.620.0>,1,13} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_recv,<29111.620.0>,
         {{0,1},10},
         [{entry,<<"a2762c69-e1f7-4d56-b2d8-bc44bff3527a">>,<29111.888.0>,
              #Ref<29111.1533303619.3224371201.128918>,
              #{mfa =>
                    {'Elixir.ModNotifications.Workers.Sender',register,
                        [<<"a2762c69-e1f7-4d56-b2d8-bc44bff3527a">>,
                         {'Elixir.ModNotifications.Workers.Delayer',
                             #{delay => 14,
                               sender =>
                                   ...long content....}]},
                notification_sender => true},
              {{0,5},{1,5}}},
          {entry,<<"59635bd8-fc95-4134-8cf0-4459cdc87fe7">>,<29111.901.0>,
              #Ref<29111.1533303619.3224371201.129074>,
              #{mfa =>
                    {'Elixir.ModNotifications.Workers.Sender',register,
                        [<<"59635bd8-fc95-4134-8cf0-4459cdc87fe7">>,
                         {'Elixir.ModNotifications.Workers.Delayer',
                             #{delay => 29,
                               sender =>
                                   ...long content....}]},
                notification_sender => true},
              {{0,10},{1,10}}}]} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_recv,<29111.620.0>,
         {{0,1},10},
         [{entry,<<"a2762c69-e1f7-4d56-b2d8-bc44bff3527a">>,<29111.888.0>,
              #Ref<29111.1533303619.3224371201.128918>,
              #{mfa =>
                    {'Elixir.ModNotifications.Workers.Sender',register,
                        [<<"a2762c69-e1f7-4d56-b2d8-bc44bff3527a">>,
                         {'Elixir.ModNotifications.Workers.Delayer',
                             #{delay => 14,
                               sender =>
                                   ...long content....}]},
                notification_sender => true},
              {{0,5},{1,5}}},
          {entry,<<"59635bd8-fc95-4134-8cf0-4459cdc87fe7">>,<29111.901.0>,
              #Ref<29111.1533303619.3224371201.129074>,
              #{mfa =>
                    {'Elixir.ModNotifications.Workers.Sender',register,
                        [<<"59635bd8-fc95-4134-8cf0-4459cdc87fe7">>,
                         {'Elixir.ModNotifications.Workers.Delayer',
                             #{delay => 29,
                               sender =>
                                   ...long content....}]},
                notification_sender => true},
              {{0,10},{1,10}}}]} in state awaiting_sync_ack
*DBG* 'Elixir.Swarm.Tracker' receive cast {sync_ack,'node1@192.168.1.21'} in state awaiting_sync_ack
[info] [swarm on node2@192.168.1.21] [tracker:awaiting_sync_ack] received sync acknowledgement from node1@192.168.1.21
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_ack,'node1@192.168.1.21'} in state awaiting_sync_ack
[info] [swarm on node2@192.168.1.21] [tracker:resolve_pending_sync_requests] pending sync requests cleared
[warn] [swarm on node2@192.168.1.21] [tracker:handle_cast] unrecognized cast: {:sync_end_tiebreaker, #PID<29111.620.0>, 1, 13}
*DBG* 'Elixir.Swarm.Tracker' consume cast {sync_end_tiebreaker,<29111.620.0>,1,13} in state tracking
Swarm.members(:notification_sender)
[]
iex(node2@192.168.1.21)4> *DBG* 'Elixir.Swarm.Tracker' receive info {nodedown,'node1@192.168.1.21',[{node_type,visible}]} in state tracking
[info] [swarm on node2@192.168.1.21] [tracker:nodedown] nodedown node1@192.168.1.21
[debug] [swarm on node2@192.168.1.21] [tracker:handle_topology_change] topology change (nodedown for node1@192.168.1.21)
*DBG* 'Elixir.Swarm.Tracker' consume info {nodedown,'node1@192.168.1.21',[{node_type,visible}]} in state tracking
[info] [swarm on node2@192.168.1.21] [tracker:handle_topology_change] topology change complete

iex(node2@192.168.1.21)4> Swarm.members(:notification_sender)
[]

Did I do or understood anything wrong?

I expected the second node to be able to find my workers and to get my workers if the first node crashes.

bitwalker commented 6 years ago

Could you give master a try? I just pushed a fix which addresses some issues with synchronization, and may resolve your problem. If you are still having issues, please gist your logs again (from all nodes) and I will take a look

FabienHenon commented 6 years ago

Thank you, this is now working fine :) Should I update to version 3.2.1? Or directly to the last commit's ref?