bitwalker / swarm

Easy clustering, registration, and distribution of worker processes for Erlang/Elixir
MIT License
1.2k stars 103 forks source link

Swarm graceful application handoff vs process kill #117

Open cburman01 opened 5 years ago

cburman01 commented 5 years ago

Hello, I have observed a peculiar behavior and was hoping to get some guidance.

I have noticed there is a different handoff of processes between nodes from when I gracefully terminate the application running as a systemd service eg: systemctl stop verses when I just kill the beam pid eg: kill -9 .

Here is the debug info when I just kill the pid: Nov 14 09:18:32 deverlapp02

Nov 14 09:18:32 dev02 ecommawarehouse[10858]: [debug] [swarm on ecommawarehouse_node@dev02] [tracker:handle_topology_change] restarting :"030_wrk" on ecommawarehouse_node@dev02 Nov 14 09:18:32 deverlapp02 ecommawarehouse[10858]: [debug] [swarm on ecommawarehouse_node@dev02] [tracker:do_track] starting :"030_wrk" on ecommawarehouse_node@dev02

Here is the debug info when I just stop the systemd service on one of the nodes:

Nov 14 09:25:17 deverlapp01 ecommawarehouse[5208]: [debug] [swarm on ecommawarehouse_node@dev01 [tracker:handle_monitor] :"030_wrk" is down: :shutdown Nov 14 09:25:17 deverlapp01 ecommawarehouse[5208]: [warn] [swarm on ecommawarehouse_node@dev01] [tracker:broadcast_event] broadcast of event ({:untrack, #PID<29757.1594.0>}) was not recevied by [:"ecommawarehouse_node@dev01"]

I am just creating a supervisor that will only allow pids to be registered on only 1 of the nodes in the cluster.. That works fine. The only problem I am having is when systemd gracefully terminates its like it runs different handoff code.

Here is the workers that actually get started. They are generic depending on DB data:

defmodule EcommAwarehouseCore.DataWorker do
  use GenServer
  require Logger

  def register(args) do
    {:ok, _pid} = start_link(args)
  end

  def start_link(seed) do
    GenServer.start_link(seed.module, seed, name: seed.name)
  end

  def init(state) do
    EcommAwarehouseCore.Registry.monitor(self(), state.name, state.module, state.args)
    get_data(state.args.timeout)
    {:ok, Map.put(state, :current, [])}
  end

  defp get_data(timeout) do
    Process.send_after(self(), :get_data, timeout)
  end

  def handle_call(:get, _from, state) do
    {:reply, state.current, state}
  end

  def handle_call({:swarm, :begin_handoff}, _from, state) do
    Logger.info("[Worker] begin_handoff: #{state.name}")
    {:reply, :resume, state}
  end

  def handle_cast({:swarm, :end_handoff}, state) do
    Logger.info("[Worker] begin_handoff: #{state.name}")
    {:noreply, state}
  end

  def handle_cast({:swarm, :resolve_conflict}, state) do
    Logger.info("[Worker] resolve_conflict: #{state.name}")
    {:noreply, state}
  end

  def handle_info(:get_data, state) do
    case state.args.function.(state.args.zone) do
      {:ok, []} ->
        nil

      {:ok, data} ->
        case data == state.current do
          true ->
            :nothing

          false ->
            IO.puts("Changed Detected Broadcasting")

            EcommAwarehouseWeb.Endpoint.broadcast(
              "channel:data",
              state.args.event,
              List.first(data)
            )

            state = %{state | current: data}
        end

      {:error, reason} ->
        Logger.error(reason)
    end

    get_data(state.args.timeout)
    {:noreply, state}
  end

  def handle_info({:swarm, :die}, state) do
    Logger.info("[Worker] swarm stopping worker: #{state.name}")
    {:stop, :normal, state}
  end
end

Here is the registry:

defmodule EcommAwarehouseCore.Registry do
  use GenServer
  require Logger

  @name __MODULE__
  @pg2_group :workers

  def start_link(_) do
    Logger.info("Starting Registry")
    GenServer.start_link(__MODULE__, [], name: @name)
  end

  def register(name, module, args) do
    GenServer.call(@name, {:register, {name, module, args}})
  end

  def monitor(pid, name, module, args) do
    GenServer.cast(@name, {:monitor, pid, {name, module, args}})
  end

  def init(_) do
    Process.send_after(self(), :log_state, 500)
    Process.send_after(self(), :load_zones, 1000)
    Process.flag(:trap_exit, true)
    {:ok, %{}}
  end

  def handle_cast({:monitor, pid, {name, module, args}}, state) do
    :pg2.create(@pg2_group)
    :pg2.join(@pg2_group, pid)
    ref = Process.monitor(pid)
    {:noreply, Map.put(state, ref, {name, module, args})}
  end

  def handle_call({:register, {name, module, args}}, _from, state) do
    case start_via_swarm(name, module, args) do
      {:ok, pid} ->
        {:reply, {:ok, pid}, state}

      {:already_registered, pid} ->
        {:reply, {:ok, pid}, state}

      {:error, reason} ->
        Logger.error("[Registry] error starting #{name} - #{inspect(reason)}")
        {:reply, {:error, reason}, state}
    end
  end

  def handle_info(:load_zones, state) do
    EcommAwarehouseCore.Repo.get_zones()
    |> elem(1)
    |> Enum.map(fn x ->
      EcommAwarehouseCore.Registry.start_via_swarm(
        String.to_atom("#{x.zone}_wrk"),
        EcommAwarehouseCore.DataWorker,
        %{
          function: &EcommAwarehouseCore.Repo.get_pk_zone_wrk/1,
          zone: String.to_char_list(x.zone),
          timeout: 5000,
          event: "update_zone",
          registry: :work
        }
      )

      EcommAwarehouseCore.Registry.start_via_swarm(
        String.to_atom("#{x.zone}_wrker"),
        EcommAwarehouseCore.DataWorker,
        %{
          function: &EcommAwarehouseCore.Repo.get_pk_zone_wrkers/1,
          zone: String.to_char_list(x.zone),
          timeout: 5000,
          event: "update_worker",
          registry: :workers
        }
      )
    end)

    {:noreply, state}
  end

  def handle_info(:log_state, state) do
    total = Swarm.registered() |> Enum.count()
    local = state |> Enum.count()
    Logger.debug("[Registry] Totals:  Swarm/#{total} Local/#{local}")
    Process.send_after(self(), :log_state, 10000)
    {:noreply, state}
  end

  def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do
    {:noreply, Map.delete(state, ref)}
  end

  def handle_info({:DOWN, ref, :process, _pid, :shutdown}, state) do
    {:noreply, Map.delete(state, ref)}
  end

  def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
    Logger.error("Going Down IO.inspect #{ref}")
    case Map.get(state, ref) do
      nil ->
        {:noreply, state}

      {name, module, args} ->
        {:ok, _pid} = start_via_swarm(name, module, args, "restarting")
        {:noreply, Map.delete(state, ref)}
    end
  end

  def start_via_swarm(name, module, args, reason \\ "starting") do
    Logger.debug("[Registry] #{reason} #{name}")
     case Swarm.register_name(name, module, :register, [%{name: name, module: module, args: args}]) do
       {:ok, pid} -> 
        Swarm.join(args.registry, pid)
        {:ok, pid}
      {:error, {reason, pid}} -> 
        {:ok, pid}
     end
  end

  def get_all_values() do
    workers = Swarm.multi_call(:workers, :get)
    work = Swarm.multi_call(:work, :get)
    %{workers: List.foldl(workers, [], &(&1 ++ &2)), work: List.foldl(work, [], &(&1 ++ &2))}
  end
end
alex88 commented 5 years ago

Hi @cburman01 did you find a solution for this in the meantime? I've just created a forum topic about this because I thought obvious that the process would be restarted https://elixirforum.com/t/genserver-isnt-restarted-on-graceful-shutdown/20523 however it seems it's not only my problem

alex88 commented 5 years ago

Seems like it needs to be manually handled https://github.com/bitwalker/swarm/pull/83 😕

balena commented 4 years ago

I just added a PR adding a restart parameter to Swarm.register_name to the caller can decide what to do when the pid terminates gracefully (or not), here.

The parameter name and its values were copied from Supervisor, and inherited about the same concepts, with the difference that :DOWN, :noconnection always restarts the process (when node goes down, just as before). The previous behaviour can be maintained if you set :transient (restart only if the node goes down abruptly).