quantum-elixir / quantum-core

:watch: Cron-like job scheduler for Elixir
https://hexdocs.pm/quantum/
Apache License 2.0
2.3k stars 147 forks source link

How to setup quantum against libcluster #485

Open wingyplus opened 3 years ago

wingyplus commented 3 years ago

I've a question about how to configuring quantum to run on libcluster, my use case is try to run on n node and let quantum run the job only once. As I observe in the issue, it has been the global mode but it seems removed in the old version. Do we have any way to tackle this use case?

felipeloha commented 3 years ago

I tried adding quantum-swarm, global: true and Random Strategy but it didn't work. help :)

wingyplus commented 3 years ago

I tried adding quantum-swarm, global: true and Random Strategy but it didn't work. help :)

@felipeloha Currently, I used a library called highlander to make the process running only 1 process in the cluster (with random node) and set the strategy to LocalStrategy.

felipeloha commented 3 years ago

Thanks for the hint. One thing I don't understand. where should I set the local strategy?

I tried using highlander to make the scheduler run in only 1 node with run_strategy: {Quantum.RunStrategy.Random, :cluster}, and it works but when it tries to run a job a separate node I get: app2_1 | 06:08:50.422 [warn] Node :"app@hi_ernesta.host" is not running. Job :hi_job could not be executed.

how did you solve this? or are your jobs running in only one node?

thanks in advance

wingyplus commented 3 years ago

@felipeloha Since you wrap scheduler process with highlander, the process will run only 1 process at a time per cluster (not per node). Set the strategy to Quantum.RunStrategy.Local will run only node that scheduler run.

The warning that you found happens because quantum try to run in random node which's not work because it expected to run quantum on all node in the cluster but in your case isn't because of highlander.

felipeloha commented 3 years ago

I tried adding quantum-swarm + global:true + Random strategy and the logs say that the nodes are found but then the jobs are run in all nodes instead of just one.

Is there a way to run the jobs in a "distributed"/random manner in the cluster?

wingyplus commented 3 years ago

@felipeloha i never use swarm extension but i guess that scheduler doesn't coordinate between node.

The hack way that i think is use :global to perform distributed lock.

Matsa59 commented 3 years ago

:warning: The code bellow doesn't ensure job in process state will end or be executed as expected

This code was "manually test" with 1, 2 and 3 nodes

I don't think you really need horde or w.e

in your application.ex register your supervisor of Quantum using a "custom" Supervisor module (see bellow for details)

children = [
  # other children
  %{
    id: MyApp.SchedulerSupervisor,
    start: {MyApp.Scheduler, :start_link, [[supervisor_module: MyApp.SchedulerSupervisor]]}
  }
]

in scheduler_supervisor.ex

defmodule MyApp.SchedulerSupervisor do
  def start_link(quantum, opts) do
    opts = Keyword.put(opts, :name, {:global, __MODULE__})

    :global.trans({__MODULE__, :start_link}, fn ->
      case :global.whereis_name(__MODULE__) do
        :undefined ->
          Quantum.Supervisor.start_link(quantum, opts)

        pid ->
          # If the process is fine in the global table then simply link it to the current process.
          Process.link(pid)
          {:ok, pid}
      end
    end)
  end
end

The main purpose of this module is to change how Quantum supervisor works.

:global.trans/2 ensure the function is not execute in the same time on multi node (https://erlang.org/doc/man/global.html#trans-2)


Now let's manage how libcluster will connect to other node.

in cluster.ex

defmodule MyApp.Cluster do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil)
  end

  def init(state), do: {:ok, state}

  def connect_node(node) do
    # retrieve the pid of our SchedulerSupervisor (in the global table)
    pid = :global.whereis_name(MyApp.SchedulerSupervisor)

    # connect to the specified node
    result = :net_kernel.connect_node(node)

    if result && is_pid(pid) do
      # if we successfully connect and there is an instance of our global SchedulerSupervisor then stop it
      Supervisor.terminate_child(MyApp.Supervisor, MyApp.SchedulerSupervisor)
    end

    # Force sync the global table
    :global.sync()

    # restart the SchedulerSupervisor (this will call `start_link` in `MyApp.SchedulerSupervisor`
    Supervisor.restart_child(MyApp.Supervisor, MyApp.SchedulerSupervisor)

    result
  end
end

:global.sync() MUST be executed AND after the supervisor was stopped. Otherwise you'll have a global name error.

in config.exs

config :my_app,
  libcluster: [
    example: [
      connect: {MyApp.Cluster, :connect_node, []},
      # config ...
    ]
  ]

:information_source: It's not fully related to libcluster you could also use :net_kernel.monitor_nodes(true) and put the code of MyAppCluster.connect_node/0 in handle_info({:nodeup, node}, state) function.

Dunno if the maintenant of this lib want this kind of code in its lib (using the :net_kernel.monitor_nodes(true) way)

Matsa59 commented 2 years ago

So update on this I got a better solution:

defmodule MyApp.SchedulerSupervisor do
  def start_link(quantum, opts) do
    case :global.whereis_name(__MODULE__) do
      :undefined ->
        with {:error, {:already_started, pid}} <- do_start_link(quantum, opts) do
          Process.link(pid)
          {:ok, pid}
        end

      pid ->
        Process.link(pid)
        {:ok, pid}
    end
  end

  defp do_start_link(quantum, opts) do
    Supervisor.start_link(__MODULE__, {quantum, opts}, name: {:global, __MODULE__})
  end

  def init(state) do
    :global.re_register_name(__MODULE__, self(), &resolve_global_conflict/3)
    Quantum.Supervisor.init(state)
  end

  defp resolve_global_conflict(_name, pid_to_keep, pid_to_kill) do
    Supervisor.stop(pid_to_kill)
    pid_to_keep
  end
end

Using global server I ensure I'll have only 1 instance of quantum supervisor. Then I force the re_register_name because if you're app start and join the cluster at the same time, I notice some weird side effect. Finally resolve_global_conflict/3 define what append when 2 processes register the same name in the global server.

Hope it help.

PS: 1 thing to know you can't know which process will be killed :/