akira / exq

Job processing library for Elixir - compatible with Resque / Sidekiq
Other
1.51k stars 182 forks source link

Unable to kill a single process while leaving others running #307

Open taylordowns2000 opened 6 years ago

taylordowns2000 commented 6 years ago
Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Elixir 1.5.1
Exq 0.10.1

Is it possible to kill a single processes (a busy worker) in Exq? I can generate a list of all the running processes with Exq.Api.processess(Process.whereis(Exq.Api)).

If I get 10 results in that {:ok, processes} tuple, I'd like to be able to pick out the one that has been running for more than 10 minutes, say, and kill it while leaving the others running.

Exq.Api.clear_processes(Process.whereis(Exq.Api)) works fine for clearing the whole list of processes, but when I pass in just the pid of the bad/long-running process, it complains:

** (FunctionClauseError) no function clause matching in GenServer.whereis/1    

    The following arguments were given to GenServer.whereis/1:

        # 1
        "<0.959.0>"

    (elixir) lib/gen_server.ex:945: GenServer.whereis/1
    (elixir) lib/gen_server.ex:764: GenServer.call/3

If I manipulate that pid a bit I get:

iex(8)> Exq.Api.clear_processes(pid("0.959.0"))  
** (exit) exited in: GenServer.call(#PID<0.959.0>, :clear_processes, 5000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:774: GenServer.call/3

And going further down this rabbit hole:

iex(4)> Exq.Api.clear_processes(IEx.Helpers.pid(pid |> String.replace(~r/[<>]/, "")))        
** (exit) exited in: GenServer.call(#PID<0.959.0>, :clear_processes, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) attempted to call GenServer #PID<0.959.0> but no handle_call/3 clause was provided
            (exq) lib/gen_server.ex:598: Exq.Worker.Server.handle_call/3
            (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:774: GenServer.call/3
iex(4)> [error] GenServer #PID<0.959.0> terminating
** (RuntimeError) attempted to call GenServer #PID<0.959.0> but no handle_call/3 clause was provided
    (exq) lib/gen_server.ex:598: Exq.Worker.Server.handle_call/3
    (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.907.0>): :clear_processes
State: %Exq.Worker.Server.State{host: "taylor-ThinkPad-X1-Carbon-5th", job_serialized: "{\"retry\":0,\"queue\":\"browser_runs\",\"jid\":\"73b7faf7-9f23-4a0b-9ed8-97264d809cbc\",\"enqueued_at\":1522354133.120197,\"class\":\"OpenFn.JobRunner\",\"args\":[{\"type\":\"ReceiptMatch\",\"trigger_id\":1,\"receipt_id\":341,\"job_id\":2}]}", manager: #PID<0.669.0>, metadata: Exq.Worker.Metadata, middleware: Exq.Middleware.Server, middleware_state: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Logger], namespace: "dev_exq", pipeline: %Exq.Middleware.Pipeline{assigns: %{host: "taylor-ThinkPad-X1-Carbon-5th", job: %Exq.Support.Job{args: [%{"job_id" => 2, "receipt_id" => 341, "trigger_id" => 1, "type" => "ReceiptMatch"}], class: "OpenFn.JobRunner", enqueued_at: 1522354133.120197, error_class: nil, error_message: nil, failed_at: nil, finished_at: nil, jid: "73b7faf7-9f23-4a0b-9ed8-97264d809cbc", processor: nil, queue: "browser_runs", retry: 0, retry_count: nil}, job_serialized: "{\"retry\":0,\"queue\":\"browser_runs\",\"jid\":\"73b7faf7-9f23-4a0b-9ed8-97264d809cbc\",\"enqueued_at\":1522354133.120197,\"class\":\"OpenFn.JobRunner\",\"args\":[{\"type\":\"ReceiptMatch\",\"trigger_id\":1,\"receipt_id\":341,\"job_id\":2}]}", manager: #PID<0.669.0>, namespace: "dev_exq", process_info: %Exq.Support.Process{host: "taylor-ThinkPad-X1-Carbon-5th", job: %Exq.Support.Job{args: [%{"job_id" => 2, "receipt_id" => 341, "trigger_id" => 1, "type" => "ReceiptMatch"}], class: "OpenFn.JobRunner", enqueued_at: 1522354133.120197, error_class: nil, error_message: nil, failed_at: nil, finished_at: nil, jid: "73b7faf7-9f23-4a0b-9ed8-97264d809cbc", processor: nil, queue: "browser_runs", retry: 0, retry_count: nil}, pid: #PID<0.959.0>, started_at: 1522354133.142235}, queue: "browser_runs", redis: Exq.Redis.Client, started_at: #DateTime<2018-03-29 20:08:53.156974Z>, stats: Exq.Stats, worker_module: OpenFn.JobRunner}, event: :before_work, halted: false, terminated: false, worker_pid: #PID<0.959.0>}, queue: "browser_runs", redis: Exq.Redis.Client, stats: Exq.Stats, work_table: #Reference<0.2669252645.2345271297.158598>}
Client #PID<0.907.0> is alive
    (stdlib) gen.erl:169: :gen.do_call/4
    (elixir) lib/gen_server.ex:771: GenServer.call/3
    (stdlib) erl_eval.erl:670: :erl_eval.do_apply/6
    (elixir) src/elixir.erl:239: :elixir.eval_forms/4
    (iex) lib/iex/evaluator.ex:219: IEx.Evaluator.handle_eval/5
    (iex) lib/iex/evaluator.ex:200: IEx.Evaluator.do_eval/3
    (iex) lib/iex/evaluator.ex:178: IEx.Evaluator.eval/3
    (iex) lib/iex/evaluator.ex:77: IEx.Evaluator.loop/1
    (iex) lib/iex/evaluator.ex:21: IEx.Evaluator.init/4
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

I'm really sorry if I've missed something in the docs, but I haven't been able to figure out how to kill (or configure Exq to automatically kill) processes that have been running for a long time. Ultimately, my objective here is to configure a queue that will kill anything that lasts more than X seconds.

Update: I found @akira 's helpful comment on #228 where he suggested :timer.kill_after(:timer.seconds(600)) as a way to auto-kill long-running stuff. (Thank you!) Still wondering if it is possible to kill a single process by hand.

Thanks in advance!

taylordowns2000 commented 6 years ago

In case it's useful, this is getting me 90% of the way there, but the process still shows up in the Exq.Api.processes(Process.whereis(Exq.Api)) tuple, and I get the [info] Re-enqueueing job from backup for node_id [taylor...] and queue [runs] problem that others have faced.

  def kill_process(process) do
    Logger.info "Killing process."
    Process.exit(IEx.Helpers.pid(process.pid |> String.replace(~r/[<>]/, "")), :kill)
    Exq.Redis.JobStat.remove_process(Exq.Redis.Client, Application.get_env(:exq, :namespace), process, Exq.Support.Process.encode(process))
    Exq.Stats.Server.process_terminated(Exq.Stats, Application.get_env(:exq, :namespace), process)
    Exq.Stats.Server.record_failure(Exq.Stats, Application.get_env(:exq, :namespace), "I killed it.", process.job)
    # Exq.Redis.Connection.flushdb!(Exq.Redis.Client)
  end

I don't want to flushdb! there at the end because that seems to drop everything from redis, not just the single job I killed. I've been scanning more issues and realizing that this is really similar to #305 .

To be honest, it still feels like I'm missing something obvious—like I'm trying to find a back door for something that should be straightforward! Apologies, again, if I'm just missing it.

robobakery commented 6 years ago

you might want to check out Exq.Redis.JobQueue.remove_job_from_backup/5

  defp extract_pid(process) do
    process.pid |> String.replace(~r/[<>]/, "") |> IEx.Helpers.pid()
  end

  def kill_process(process) do
    process
    |> extract_pid()
    |> Process.exit(:kill)

    job_serialized =
      process.job
      |> Map.take(["queue", "retry", "class", "args", "jid", "enqueued_at"])
      |> Exq.Support.Config.serializer.encode!()

    Exq.Redis.JobQueue.remove_job_from_backup(
      Exq.Redis.Client, Application.get_env(:exq, :namespace), process.host, process.job["queue"], job_serialized)

    process_serialized =
      process
      |> Exq.Support.Config.serializer.encode!()

    Exq.Redis.JobStat.remove_process(
      Exq.Redis.Client, Application.get_env(:exq, :namespace), process, process_serialized)

    Exq.Manager.Server.job_terminated(Exq, Application.get_env(:exq, :namespace), process.job["queue"], job_serialized)
  end