quantum-elixir / quantum-core

:watch: Cron-like job scheduler for Elixir
https://hexdocs.pm/quantum/
Apache License 2.0
2.31k stars 148 forks source link

"Jobs were skipped" error #441

Closed sezaru closed 4 years ago

sezaru commented 4 years ago

Hey there, after having my mnesia storage applied to my Quantum scheduler and leaving my dev backend closed for some time, I got this error every time I initialize it again:

13:33:49.373 [error] #PID<0.1026.0> :gen_server "error_info/7" "gen_server.erl" 889 
↳ GenServer Pipeline.Workers.Scheduler.ExecutionBroadcaster terminating
** (RuntimeError) Jobs were skipped
    (quantum 3.0.0) lib/quantum/execution_broadcaster.ex:181: Quantum.ExecutionBroadcaster.execute_events_to_fire/2
    (quantum 3.0.0) lib/quantum/execution_broadcaster.ex:80: anonymous fn/2 in Quantum.ExecutionBroadcaster.handle_events/3
    (elixir 1.10.2) lib/enum.ex:2111: Enum."-reduce/3-lists^foldl/2-0-"/3
    (quantum 3.0.0) lib/quantum/execution_broadcaster.ex:79: Quantum.ExecutionBroadcaster.handle_events/3
    (gen_stage 1.0.0) lib/gen_stage.ex:2395: GenStage.consumer_dispatch/6
    (gen_stage 1.0.0) lib/gen_stage.ex:2574: GenStage.take_pc_events/3
    (stdlib 3.12.1) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib 3.12.1) gen_server.erl:711: :gen_server.handle_msg/6
Last message: {:"$gen_consumer", {#PID<0.1023.0>, #Reference<0.3396619105.1127743495.232289>}, [%Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:40]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:39]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:38]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:37]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:36]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:35]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:34]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:33]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:32]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:31]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:30]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:29]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:28]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:27]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:26]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:25]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:24]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:23]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:22]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:21]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:20]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:19]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:18]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:17]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:16]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:15]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:14]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:13]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:12]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:11]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:10]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:09]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:08]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:07]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:06]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:05]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:04]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:03]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:02]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:01]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:07:00]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:06:59]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:06:58]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:06:57]}, %Quantum.ClockBroadcaster.Event{catch_up: true, time: ~N[2020-06-15 05:06:56]}, %Quantum.ClockBroadcaster.Event{catch_up: true, ...}, %Quantum.ClockBroadcaster.Event{...}, ...]}
State: %Quantum.ExecutionBroadcaster.State{debug_logging: true, execution_timeline: [{~N[2020-06-15 05:01:00], [%Quantum.Job{name: :ticker, overlap: true, run_strategy: %Quantum.RunStrategy.Random{nodes: :cluster}, schedule: ~e[1 * * * * *], state: :active, task: #Function<21.126501267/0 in :erl_eval.expr/5>, timezone: :utc}, %Quantum.Job{name: #Reference<0.740557040.3108503554.175725>, overlap: true, run_strategy: %Quantum.RunStrategy.Random{nodes: :cluster}, schedule: ~e[1 * * * * *], state: :active, task: #Function<21.126501267/0 in :erl_eval.expr/5>, timezone: :utc}]}], scheduler: Pipeline.Workers.Scheduler, storage: QuantumStorageMnesia, storage_pid: #PID<0.1017.0>, uninitialized_jobs: []}

Looking at the code in execution_broadcaster.ex:181, what I understood is that if a job was scheduled to be executed during the time the backend was closed, I will receive this message when I initialize it again. This message should disappear the next time this job is executed.

Is this assumption correct and this error is just a harmless warning or is this a bigger issue that should be handled somehow?

maennchen commented 4 years ago

@sezaru That hints at a bigger problem.

The basic idea is that all missed jobs will be catched up until the current time. Is is possible that Quantum.Storage.last_execution_date/1 is implemented incorrectly?

sezaru commented 4 years ago

I will take a closer look!

Also, just to make sure, when the system runs purge it means that it will remove all jobs and also the last_execution_date correct?

sezaru commented 4 years ago

So, I took a better look at my code but it seems to be fine, at least it does the same as QuantumStoragePersistentEts.

I inserted some IO.inspect when the error happens to get the state and time back.

This is the values some minutes before the error:

state = %Quantum.ExecutionBroadcaster.State{
  debug_logging: true,
  execution_timeline: [
    {~N[2020-06-15 18:00:00],
     [
       %Quantum.Job{
         name: #Reference<0.1797575718.2215116807.66788>,
         overlap: true,
         run_strategy: %Quantum.RunStrategy.Random{nodes: :cluster},
         schedule: ~e[0 * * * * *],
         state: :active,
         task: {Pipeline.Workers.DropExpiredAlerts, :run, [:one_minute]},
         timezone: :utc
       }
     ]}
  ],
  scheduler: Pipeline.Workers.Scheduler,
  storage: QuantumStorageMnesia,
  storage_pid: #PID<0.1010.0>,
  uninitialized_jobs: []
}

time = ~N[2020-06-15 17:57:01]

As you can see, time was ~N[2020-06-15 17:57:01] and time_to_execute was ~N[2020-06-15 18:00:00].

After that, I closed the app and waited time pass ~N[2020-06-15 18:00:00] and then I restarted the app again, triggering the error with these values:

state = %Quantum.ExecutionBroadcaster.State{
  debug_logging: true,
  execution_timeline: [
    {~N[2020-06-15 18:00:00],
     [
       %Quantum.Job{
         name: #Reference<0.3391653494.335544324.184807>,
         overlap: true,
         run_strategy: %Quantum.RunStrategy.Random{nodes: :cluster},
         schedule: ~e[0 * * * * *],
         state: :active,
         task: {Pipeline.Workers.DropExpiredAlerts, :run, [:one_minute]},
         timezone: :utc
       }
     ]}
  ],
  scheduler: Pipeline.Workers.Scheduler,
  storage: QuantumStorageMnesia,
  storage_pid: #PID<0.1385.0>,
  uninitialized_jobs: []
}

time = ~N[2020-06-15 18:02:02]

This time, time was ~N[2020-06-15 18:02:02] and time_to_execute was still ~N[2020-06-15 18:00:00].

Now, is this really wrong? I mean, IF the app was running, execute_events_to_fire/2 would be called during ~N[2020-06-15 18:00:00] time and would execute the job, since I only reopened the app after that time, the function was never triggered and that's why it was skipped.

Is this really a bug? I mean, what should really happen in this case?

maennchen commented 4 years ago

@sezaru In this case the clock broadcaster should restart at the time where it was shut down and basically go through all dates between the last running date and now in a faster pace.

sezaru commented 4 years ago

Oh, I see.

Where does this happen in Quantum code? I will try debugging this part and see what the storage is returning to Quantum making it not run this path correctly.

sezaru commented 4 years ago

So, I think I found two issues, the first one is located at file job_broadcaster.ex at function init/1.

The code calls the storage to get the list of jobs, if :not_applicable is returned, the config jobs are used.

The issue is that these jobs from config are never added to the storage jobs, only new jobs added via the scheduler add_job function will be added to the storage (This will actually break the config jobs as the next time this function is called, the storage will return the manually added job and will never use the config ones anymore).

To fix that I added this to the :not_applicable case:

        :not_applicable ->
          debug_logging &&
            Logger.debug(fn ->
              "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
            end)

          Enum.each(jobs, fn job -> :ok = storage.add_job(storage_pid, job) end)

          jobs

Now if there is no job in the storage, the ones from config will be added to it.

Note that this doesn't handle the case when you change/add/remove some job from your config file, I'm not sure what is the best approach to merge them.

Maybe a best way it to not add these config jobs to the storage (as it is the default) but then change the other case from the function:

        storage_jobs when is_list(storage_jobs) ->
          debug_logging &&
            Logger.debug(fn ->
              "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
            end)

          jobs ++ storage_jobs

So in this case instead of adding the config jobs to the storage, we simply return both the config jobs and the storage jobs in a concatenated list.

The next issue I found is that during the catch-up phase, the events are being sent in the inverted order, the events are aggregated in file clock_broadcaster.ex in function handle_demand at line 67 during an Enum.reduce_while:

 {events, new_time} =
      Enum.reduce_while(
        1..expected_event_count,
        {[], time},
        fn _, {list, time} = acc ->
          new_time = NaiveDateTime.add(time, 1, :second)

          case NaiveDateTime.compare(new_time, now) do
            :lt ->
              {:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}}

            _ ->
              {:halt, acc}
          end
        end
      )

The issue here is that it creates events by adding one second, but it adds these events in the start of the list (for performance purposes), this means that the time of these events will be in reverse order, adding the code bellow right after the Enum.reduce_while fixes it as it reverses back to the correct order:

    events = Enum.reverse(events)

With that, I don't get "Jobs were skipped" error anymore since now the catch-up is handled in the correct order and the jobs can be correctly executed.

maennchen commented 4 years ago

@sezaru

Note that this doesn't handle the case when you change/add/remove some job from your config file, I'm not sure what is the best approach to merge them.

This is a known problem, my decision was to add them to the storage and ignore the config when the storage is present.

The next issue I found is that during the catch-up phase, the events are being sent in the inverted order

Oh, good catch. Would you create a PR for that?

maennchen commented 4 years ago

@sezaru Thanks for the PR. Does this solve the complete problem for you?

maennchen commented 4 years ago

(If yes,I'll create a release.)

sezaru commented 4 years ago

Thanks for the PR. Does this solve the complete problem for you?

Yep, now it is working great!

This is a known problem, my decision was to add them to the storage and ignore the config when the storage is present.

Oh, indeed, I totally missed your note in the documentation about this...

But wouldn't concatenating both jobs lists be a better solution? Is there some issue that I'm not seeing related to that?

Also, if that is the case, my guess is that the way for them to use Quantum with the storage would be to have 2 schedulers right? One for the config jobs, where the only thing in the storage is the last_execution_date. And another one without any config jobs that is used to add jobs at runtime.

That said, maybe a good solution would be to issue a warning when the user have config jobs and uses the same scheduler to add runtime jobs since he will maybe not realize that the next time he restarts the app their config jobs will not run anymore.

Maybe I should create an issue for just this topic since this one is getting pretty big already.

maennchen commented 4 years ago

But wouldn't concatenating both jobs lists be a better solution? Is there some issue that I'm not seeing related to that?

The problem is that config jobs are also not static. They can be enabled / disabled / deleted / re-added. Therefore you can either load jobs from the config or the storage but not a combination.

The docs can always be improved. You're welcome to create a new issue for this discussion or also provide a PR describing this better.