brndnmtthws / citrine

Elixir library for running cron-based scheduled jobs on your Erlang cluster
https://hexdocs.pm/citrine/readme.html
MIT License
14 stars 3 forks source link

(FunctionClauseError) no function clause matching in Citrine.JobExecutor.handle_info/2 #1

Closed amacgregor closed 3 years ago

amacgregor commented 4 years ago

Failure after first execution:

[debug] finished job id=job_siteguardian.dev in 1.336468s
[debug] terminating Citrine.JobExecutor with reason: {:function_clause, [{Citrine.JobExecutor, :handle_info, [{:EXIT, #Port<0.21>, :normal}, %{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_siteguardian.dev", schedule: "* * * * *", task: #Function<1.73740159/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.3764802182.1941176323.235445>}], [file: 'lib/citrine/job_executor.ex', line: 101]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 680]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 756]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]} and state=%{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_siteguardian.dev", schedule: "* * * * *", task: #Function<1.73740159/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.3764802182.1941176323.235445>}
[error] GenServer {Siteguardian.Scheduler.Registry, "job_siteguardian.dev"} terminating
** (FunctionClauseError) no function clause matching in Citrine.JobExecutor.handle_info/2
    (citrine 0.1.11) lib/citrine/job_executor.ex:101: Citrine.JobExecutor.handle_info({:EXIT, #Port<0.21>, :normal}, %{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_siteguardian.dev", schedule: "* * * * *", task: #Function<1.73740159/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.3764802182.1941176323.235445>})
    (stdlib 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #Port<0.21>, :normal}
State: %{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_siteguardian.dev", schedule: "* * * * *", task: #Function<1.73740159/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.3764802182.1941176323.235445>}

Not sure what I'm going wrong here or why executing my task fails after the first run

brndnmtthws commented 4 years ago

It looks like it's not handling the :EXIT message here (which is a bug), but I'm not sure why it's terminating there. Do you know why it is terminating? Is the supervisor shutting down for some reason? Can you share the code for Siteguardian.Scheduler.Registry and Siteguardian.Application.start?

amacgregor commented 4 years ago

Application start code:

  def start(_type, _args) do
    children = [
      # Start the Ecto repository
      Siteguardian.Repo,
      # Start the Telemetry supervisor
      SiteguardianWeb.Telemetry,
      # Start the PubSub system
      {Phoenix.PubSub, name: Siteguardian.PubSub},
      # Start the Endpoint (http/https)
      SiteguardianWeb.Endpoint,
      # Start the Citrine Scheduler
      Siteguardian.Scheduler,
      # Start a Command Runner
      Siteguardian.CommandRunner,
      # Start a worker by calling: Siteguardian.Worker.start_link(arg)
      # {Siteguardian.Worker, arg}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Siteguardian.Supervisor]
    start_val = Supervisor.start_link(children, opts)

    for domain <- Siteguardian.Automation.list_active_domains() do
       Siteguardian.Scheduler.put_job(%Citrine.Job{
          id: "job_#{domain.name}",
          schedule: "* * * * *", # Run every second
          task: fn -> TaskRunner.start(domain) end,
        })
    end

    start_val
  end

I have been able to narrow it down somewhat and it seems that is related to the following snippet of code:

    case System.cmd("/bin/sh", ["-c", command]) do
      {output, 0} ->
        output
        |> format_output
        |> output_to_map(%{})
        |> IO.inspect()

        {:ok, domain}
      _ ->
        IO.inspect("Nope")
        {:error, "Error: unable to load certificate"}
    end

that snippet of code is called as part of the Task execution, without it seems to work fine.

brndnmtthws commented 4 years ago

Ah okay, I see what's happening. Internally System.cmd uses Port.

Here's a note about it:

Internally, this function uses a Port for interacting with the outside world. However, if you plan to run a long-running program, ports guarantee stdin/stdout devices will be closed but it does not automatically terminate the program. The documentation for the Port module describes this problem and possible solutions under the "Zombie processes" section.

And, as per the Port docs:

On its turn, the port will send the connected process the following messages:

  • {port, {:data, data}} - data sent by the port
  • {port, :closed} - reply to the {pid, :close} message
  • {port, :connected} - reply to the {pid, {:connect, new_pid}} message
  • {:EXIT, port, reason} - exit signals in case the port crashes. If reason is not :normal, this message will only be received if the owner process is trapping exits

I'm not 100% sure what the idiomatic Elixir way to handle this is. If I were to guess, I think your code should wrap the System.cmd call with a Task so it doesn't propagate to the executor. I also think the commit I just added (30f4c57e3cf0c2584731844907feb61dd061fc17) was probably not necessary.

amacgregor commented 4 years ago

Thank for this was also looking into a DynamicSupervisor to pass the command execution to a worker

brndnmtthws commented 4 years ago

The easiest way may be Task.Supervisor.

amacgregor commented 3 years ago
    Task.async(fn -> execute_command(command) end)
    |> Task.await()
    |> IO.inspect()

Will still result on

[error] GenServer {Siteguardian.Scheduler.Registry, "job_coderoncode.com"} terminating
** (FunctionClauseError) no function clause matching in Citrine.JobExecutor.handle_info/2
    (citrine 0.1.11) lib/citrine/job_executor.ex:101: Citrine.JobExecutor.handle_info({:EXIT, #PID<0.622.0>, :normal}, %{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_coderoncode.com", schedule: "* * * * *", task: #Function<1.74548289/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.589530647.1897136130.163894>})
    (stdlib 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.622.0>, :normal}
State: %{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_coderoncode.com", schedule: "* * * * *", task: #Function<1.74548289/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.589530647.1897136130.163894>}
brndnmtthws commented 3 years ago

You need something to handle that exit event, or else it will bubble up. I actually not sure if the unhandled event is harmful; does it keep running after that?

In any case, you need to wrap your task with something that can trap the exits and handle them accordingly.

amacgregor commented 3 years ago

Yeah, thats the tricky bit. I'm having a hard time finding the Idiomatic way to handle that. That said I did try adding the handle_info from your earlier commit which does kinda work; by handling the exit however that also terminates the job

  @impl true
  def handle_info({:EXIT, _pid, :normal}, state) do
    {:stop, :normal, state}
  end
[debug] finished job id=job_siteguardian.dev in 0.468161s
[debug] terminating Citrine.JobExecutor with reason: :normal and state=%{cron_expr: ~e[* * * * * *], job: %Citrine.Job{extended_syntax: false, id: "job_siteguardian.dev", schedule: "* * * * *", task: #Function<1.47134453/0 in Siteguardian.Application.start/2>}, registry: Siteguardian.Scheduler.Registry, timer: #Reference<0.3858931454.2454716417.247372>}

I could in theory use my fork and {:EXIT, _pid, :normal} to reschedule the job but that kinda wrong.

brndnmtthws commented 3 years ago

There's a discussion of a similar issue here: https://elixirforum.com/t/supervising-async-tasks/14412/6

I think if it were me, the easiest thing would be to write a tiny GenServer that is wrapped by a Task and then you can trap the exit and handle it accordingly there. There's an example in the thread I just linked to which you could adapt.

amacgregor commented 3 years ago

@brndnmtthws Thank you that last thread was exactly what I needed, and this is working for now (I think the Genserver might need some tweaking for scale)

Thanks again, I'm going to close the issue