derekkraan / horde

Horde is a distributed Supervisor and Registry backed by DeltaCrdt
MIT License
1.32k stars 106 forks source link

DeltaCrdt BadFunctionError on diffs_to_callback/3 #166

Closed jeffgrunewald closed 5 years ago

jeffgrunewald commented 5 years ago

When updating the module-based Horde implementation in our application we receive an odd failure that causes a horde_shutting_down event.

Error message:

11:39:13.204 [error] GenServer Reaper.Horde.Supervisor.Crdt terminating
** (BadFunctionError) function #Function<0.61700763/1 in Horde.DynamicSupervisor> is invalid, likely because it points to an old version of the code
    (delta_crdt) lib/delta_crdt/causal_crdt.ex:376: DeltaCrdt.CausalCrdt.diffs_to_callback/3
    (delta_crdt) lib/delta_crdt/causal_crdt.ex:396: DeltaCrdt.CausalCrdt.update_state_with_delta/3
    (delta_crdt) lib/delta_crdt/causal_crdt.ex:193: DeltaCrdt.CausalCrdt.handle_call/3
    (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from Reaper.Horde.Supervisor): {:operation, {:add, [{:member_node_info, {Reaper.Horde.Supervisor, :nonode@nohost}}, %Horde.DynamicSupervisor.Member{name: {Reaper.Horde.Supervisor, :nonode@nohost}, status: :shutting_down}]}}

11:39:13.206 [error] GenServer Reaper.Horde.Supervisor terminating
** (stop) exited in: GenServer.call(Reaper.Horde.Supervisor.Crdt, {:operation, {:add, [{:member_node_info, {Reaper.Horde.Supervisor, :nonode@nohost}}, %Horde.DynamicSupervisor.Member{name: {Reaper.Horde.Supervisor, :nonode@nohost}, status: :shutting_down}]}}, :infinity)
    ** (EXIT) an exception was raised:
        ** (BadFunctionError) function #Function<0.61700763/1 in Horde.DynamicSupervisor> is invalid, likely because it points to an old version of the code
            (delta_crdt) lib/delta_crdt/causal_crdt.ex:376: DeltaCrdt.CausalCrdt.diffs_to_callback/3
            (delta_crdt) lib/delta_crdt/causal_crdt.ex:396: DeltaCrdt.CausalCrdt.update_state_with_delta/3
            (delta_crdt) lib/delta_crdt/causal_crdt.ex:193: DeltaCrdt.CausalCrdt.handle_call/3
            (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (horde) lib/horde/dynamic_supervisor_impl.ex:79: Horde.DynamicSupervisorImpl.handle_call/3
    (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.1837.0>): :horde_shutting_down

11:39:13.208 [error] GenServer #PID<0.1837.0> terminating
** (stop) exited in: GenServer.call(Reaper.Horde.Supervisor, :horde_shutting_down, 5000)
    ** (EXIT) exited in: GenServer.call(Reaper.Horde.Supervisor.Crdt, {:operation, {:add, [{:member_node_info, {Reaper.Horde.Supervisor, :nonode@nohost}}, %Horde.DynamicSupervisor.Member{name: {Reaper.Horde.Supervisor, :nonode@nohost}, status: :shutting_down}]}}, :infinity)
        ** (EXIT) an exception was raised:
            ** (BadFunctionError) function #Function<0.61700763/1 in Horde.DynamicSupervisor> is invalid, likely because it points to an old version of the code
                (delta_crdt) lib/delta_crdt/causal_crdt.ex:376: DeltaCrdt.CausalCrdt.diffs_to_callback/3
                (delta_crdt) lib/delta_crdt/causal_crdt.ex:396: DeltaCrdt.CausalCrdt.update_state_with_delta/3
                (delta_crdt) lib/delta_crdt/causal_crdt.ex:193: DeltaCrdt.CausalCrdt.handle_call/3
                (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
                (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
                (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (horde) lib/horde/signal_shutdown.ex:21: anonymous fn/1 in Horde.SignalShutdown.terminate/2
    (elixir) lib/enum.ex:769: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:769: Enum.each/2
    (stdlib) gen_server.erl:673: :gen_server.try_terminate/3
    (stdlib) gen_server.erl:858: :gen_server.terminate/10
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.1832.0>, :shutdown}

Implementation:

defmodule Reaper.Horde.Supervisor do
  @moduledoc """
  Module Based Horde.Supervisor
  """
  use Horde.DynamicSupervisor
  require Logger

  import SmartCity.Event, only: [data_extract_end: 0, file_ingest_end: 0]

  @instance Reaper.Application.instance()

  def start_link(init_args \\ []) do
    Horde.DynamicSupervisor.start_link(__MODULE__, init_args, name: __MODULE__)
  end

  @impl Horde.DynamicSupervisor
  def init(init_args) do
    [members: get_members(), strategy: :one_for_one]
    |> Keyword.merge(init_args)
    |> Horde.DynamicSupervisor.init()
  end

  defp get_members() do
    [Node.self() | Node.list()]
    |> Enum.map(fn node -> {__MODULE__, node} end)
  end

  def start_child(child_spec) do
    Horde.DynamicSupervisor.start_child(__MODULE__, child_spec)
  end

  def terminate_child(pid) do
    Horde.DynamicSupervisor.terminate_child(__MODULE__, pid)
  end

  def start_data_extract(%SmartCity.Dataset{} = dataset) do
    Logger.debug(fn -> "#{__MODULE__} Start data extract process for dataset #{dataset.id}" 
  end)

    send_extract_complete_event = fn ->
      Brook.Event.send(@instance, data_extract_end(), :reaper, dataset)
    end

    start_child(
      {Reaper.RunTask,
       name: dataset.id,
       mfa: {Reaper.DataExtract.Processor, :process, [dataset]},
       completion_callback: send_extract_complete_event}
    )
  end
end
derekkraan commented 5 years ago

Hi @jeffgrunewald, can you try out this branch and let me know if it helps the situation?

Just use the following line in your mix deps: {:horde, github: "derekkraan/horde", branch: "on_diffs_mfa"}

jeffgrunewald commented 5 years ago

Works beautifully; many thanks!

derekkraan commented 5 years ago

Great, thanks for testing! I'll merge these branches and push out a new release.

derekkraan commented 5 years ago

v0.7.1