bitwalker / swarm

Easy clustering, registration, and distribution of worker processes for Erlang/Elixir
MIT License
1.19k stars 102 forks source link

Feature request - Swarm.send_after/3 #115

Open devonestes opened 5 years ago

devonestes commented 5 years ago

First off, thanks for the awesome library! Sure makes clustering easy when running kubernetes.

I ran into an issue today where I want to be able to send a message after some given time to a process that's registered with Swarm (using Swarm.register_name/4), but I want to look up the PID to send to through Swarm at the time that the message is sent, not at the time of calling Process.send_after/4, that way if the PID and node for that process changes between the time of calling Process.send_after/4 and the end of the delay, the message is still sent and not dropped because the process doesn't exist.

I tried the following:

Process.send_after(
  {:via, :swarm, process_name(device_id)},
  :do_a_thing,
  @one_hour
)

But that {:via, :swarm, name} thing doesn't work for Process.send_after/4 in the same way it does when registering a name for a process.

I'm probably going to build this on my own anyway since I need this functionality, but if you're interested in having this feature as part of Swarm, let me know and I'll gladly extract it so it can be pushed upstream.

Or, if I'm going about this all wrong and there's a smarter way to do this, just let me know!

Thanks again for the great library!

devonestes commented 5 years ago

So I have a little update on this.

While implementing this yesterday, I actually found that I liked doing something a little different than send_after better since I could go through the public API for my GenServer instead of sending it a message directly (which made testing that GenServer much more clear).

How about Swarm.execute_after/3, which looks kind of like this:

Swarm.execute_after(
  :process_name_registered_with_swarm,
  fn pid -> MyServer.perform_a_cast_instead_of_bare_send(pid, :do_a_thing) end,
  1000
)

Now after the given timeout we look up the current PID for the given registered process name, and then call the function we're given, passing in that PID for that name.

It turned out to not be so difficult of an implementation to handle this stuff, so I can see if y'all don't want to add this to the library and have this as either a standalone module or let users implement it themselves if they need it, but to me it seems like a really helpful and useful thing to have, so I'm happy to push either send_after/3 or execute_after/3 upstream if y'all are interested.

arjan commented 5 years ago

This looks nice Devon. However I'm not sure whether this has a place as a part of the library... let me explain:

Maybe the others have different opinions, though, what do you think @bitwalker @beardedeagle ?

devonestes commented 5 years ago

Hey @arjan!

So, yeah, it isn't super difficult to implement, but there is some complexity involved with keeping these timer processes alive in the event of topology changes. Here's the basic implementation I have so far, which should give you some idea of what I'm thinking of.

defmodule SendAfter do
  use GenServer

  @type timer_ref :: reference()

  ## PUBLIC API

  @spec send_after(term, term, non_neg_integer) :: timer_ref()
  def send_after(name, message, delay_in_milliseconds) do
    ref = make_ref()

    Swarm.register_name(ref, GenServer, :start_link, [
      __MODULE__,
      {name, message, delay_in_milliseconds}
    ])

    ref
  end

  @spec send_after(term(), (pid() -> term()), non_neg_integer()) :: timer_ref()
  def execute_after(name, function, delay_in_milliseconds) do
    ref = make_ref()

    Swarm.register_name(ref, GenServer, :start_link, [
      __MODULE__,
      {name, function, delay_in_milliseconds}
    ])

    ref
  end

  @spec cancel_timer(timer_ref()) :: :ok
  def cancel_timer(timer_ref) do
    timer_ref
    |> Swarm.whereis_name()
    |> GenServer.cast(:cancel_timer)
  end

  ## CALLBACKS

  @impl true
  def init({name, function, delay}) when is_function(function, 1) do
    end_time = System.system_time(:milliseconds) + delay
    initial_state = %{end_time: end_time, function: function, process_name: name}
    {:ok, initial_state, {:continue, :after_init}}
  end

  @impl true
  def init({name, message, delay}) do
    end_time = System.system_time(:milliseconds) + delay
    initial_state = %{end_time: end_time, message: message, process_name: name}
    {:ok, initial_state, {:continue, :after_init}}
  end

  @impl true
  def handle_continue(:after_init, state) do
    GenServer.cast(self(), :check_end_time)
    {:noreply, state}
  end

  @impl true
  def handle_call({:swarm, :begin_handoff}, _from, state) do
    {:reply, {:resume, state}, state}
  end

  @impl true
  def handle_cast({:swarm, :end_handoff, state}, _state) do
    {:noreply, state}
  end

  @impl true
  def handle_cast({:swarm, :resolve_conflict, _state}, state) do
    {:noreply, state}
  end

  @impl true
  def handle_cast(:cancel_timer, _) do
    {:stop, :normal, nil}
  end

  @impl true
  def handle_cast(:check_end_time, state) do
    if System.system_time(:milliseconds) >= state.end_time do
      deliver_message(state)
    else
      wait(state)
    end
  end

  defp deliver_message(state) do
    case Swarm.whereis_name(state.process_name) do
      :undefined -> # The semantics of handling this case could be totally different, but it works for me for now
        GenServer.cast(self(), :check_end_time)
        {:noreply, state}

      pid ->
        deliver_or_execute(pid, state)
        {:stop, :normal, nil}
    end
  end

  defp deliver_or_execute(pid, %{function: function}), do: function.(pid)
  defp deliver_or_execute(pid, %{message: message}), do: send(pid, message)

  defp wait(state) do
    Process.sleep(10) # This interval could be configured by users
    GenServer.cast(self(), :check_end_time)
    {:noreply, state}
  end
end

It's rough, and imperfect for sure, but (as far as I can tell) it won't drop messages that we've queued up if there's a topology change, which is a big step up over the current state of things.

arjan commented 5 years ago

This looks great @devonestes, however wouldn't it be better to create a separate library for this? Just trying to keep the code base small.. :-)

beardedeagle commented 5 years ago

I'm wondering if when we model this lib after libclusters app structure we won't be able to have some sort of pluggable system where code like this could be loaded? @bitwalker

devonestes commented 5 years ago

Yeah, I could publish it as a library, but I probably won’t. It’s quite small, and it only works as an extension to swarm. I guess if folks are curious about this kind of thing then the code is already here and they can take it if they want. It doesn’t have to actually be part of swarm.