shinyscorpion / task_bunny

TaskBunny is a background processing application written in Elixir and uses RabbitMQ as a messaging backend
MIT License
202 stars 30 forks source link

[RFC] Add ability to turn workers on/off #40

Closed PhillippOhlandt closed 7 years ago

PhillippOhlandt commented 7 years ago

Hey,

I would like to write a simple web API for my queue worker so I can turn it on and off. It's because the worker will process quite time intensive jobs (10min - 2h). For deployments, I would like to turn it off, wait for it to finish its current job and deploy the new code.

A simple code based API to turn it off/on and check the current status would be enough. I can write the web API on top of it then.

ono commented 7 years ago

https://hexdocs.pm/task_bunny/TaskBunny.WorkerSupervisor.html#graceful_halt/1 can help?

PhillippOhlandt commented 7 years ago

That seemed to stop the worker, but I got an error too:

iex(7)> TaskBunny.WorkerSupervisor.graceful_halt :"TaskBunny.Worker.xxx.normal"

17:48:19.488 [info]  TaskBunny.Worker: stop consuming. Queue: xxxx.normal. Concurrency: 1. PID: #PID<0.343.0>.
** (ArithmeticError) bad argument in arithmetic expression
    (task_bunny) lib/task_bunny/worker_supervisor.ex:74: TaskBunny.WorkerSupervisor.wait_for_all_jobs_done/2
    (task_bunny) lib/task_bunny/worker_supervisor.ex:58: TaskBunny.WorkerSupervisor.graceful_halt/2

Not sure if I used it correctly.

PhillippOhlandt commented 7 years ago

Sorry, I am a bit stupid today (it's the heat). Calling it with one argument accepts a timeout and not the pid (not sure if my use of an atom was even valid, lol).

TaskBunny.WorkerSupervisor.graceful_halt 0 works fine.

PhillippOhlandt commented 7 years ago

@ono Can I somehow find out if the workers are consuming or not?

ono commented 7 years ago

Yes, check WorkerSupervisor code. Mainly around https://github.com/shinyscorpion/task_bunny/blob/master/lib/task_bunny/worker_supervisor.ex#L89-L92

PhillippOhlandt commented 7 years ago

Ok, copied some code together and it's working now. Maybe it's worth adding such a status check to task_bunny?

PhillippOhlandt commented 7 years ago

Here is my little custom module, which basically just contains your private functions:

defmodule Worker.Status do

  def running?() do
    get_supervisor_pid()
    |> get_worker_pids()
    |> workers_running?()
  end

  defp get_supervisor_pid() do
    Process.whereis(TaskBunny.WorkerSupervisor)
  end

  defp get_worker_pids(pid) do
    pid
    |> Supervisor.which_children()
    |> Enum.map(fn ({_, child, _, _}) -> child end)
    |> Enum.filter(fn (child) -> is_pid(child) end)
  end

  defp workers_running?(workers) do
    workers
    |> Enum.any?(fn (pid) -> worker_running?(pid) end)
  end

  defp worker_running?(pid) when is_pid(pid) do
    %{runners: runners, consuming: consuming} = GenServer.call(pid, :status)
    runners > 0 || consuming
  end
end
ono commented 7 years ago

Nice job. I think writing/sharing such neat code snippet is recommended way rather than making the library bigger. If it becomes beyond the snippet it can be independent hex library.

PhillippOhlandt commented 7 years ago

Maybe we could collect these somewhere? The wiki perhaps?

PhillippOhlandt commented 7 years ago

Another quick helper for turning the worker on/off:

defmodule Worker.Control do

  def continue() do
    get_supervisor_pid()
    |> Process.exit(:kill)
  end

  def halt() do
    TaskBunny.WorkerSupervisor.graceful_halt(0)
  end

  defp get_supervisor_pid() do
    Process.whereis(TaskBunny.WorkerSupervisor)
  end
end
PhillippOhlandt commented 7 years ago

@ono I think I found an issue. When I halt and continue multiple times, I see new connections to RabbitMQ in its web control panel. I guess this is not intended?

IanLuites commented 7 years ago

I think that happens because you kill the worker WorkerSupervisor, which makes the WorkerSupervisor's supervisor (TaskBunnny.Supervisor) restart all children to get the WorkerSupervisor up again.

See: supervisor strategy

The connection manager will also be restarted, since the Supervisor also manages the connection pool.

(@ono correct me if I'm wrong.)

So to continue you will probably need to use Supervisor.restart_child/2 and start the children manually.

IanLuites commented 7 years ago

As for the snippet, just pointing out some changes that make things smaller: (I always bother @ono with that)

  defp get_worker_pids(pid) do
    pid
    |> Supervisor.which_children()
    |> Enum.map(fn ({_, child, _, _}) -> child end)
    |> Enum.filter(fn (child) -> is_pid(child) end)
  end

can also be written as: (the middle line is up for debate)

  defp get_worker_pids(pid) do
    pid
    |> Supervisor.which_children
    |> Enum.map(&elem(&1, 1))
    |> Enum.filter(&is_pid/1)
  end

Same for |> Enum.any?(fn (pid) -> worker_running?(pid) end) to |> Enum.any?(&worker_running?/1).

PhillippOhlandt commented 7 years ago

Seems like restart_child doesn't work.

  defp restart_workers(sup, workers) do
    workers
    |> Enum.each(fn(worker) ->
      Supervisor.restart_child(sup, worker)
    end)
  end

I get no log message that the worker starts consuming again.

PhillippOhlandt commented 7 years ago

The connection manager will also be restarted, since the Supervisor also manages the connection pool.

This sounds good to me, but I think the issue is that the old connection won't be cleaned up.