sorentwo / oban

💎 Robust job processing in Elixir, backed by modern PostgreSQL and SQLite3
https://getoban.pro
Apache License 2.0
3.18k stars 297 forks source link

Oban Pro: jobs get stuck in executing and become orphans #1037

Closed smt116 closed 4 months ago

smt116 commented 5 months ago

Environment

config [filtered], Oban,
  repo: [filtered],
  engine: Oban.Pro.Engines.Smart,
  notifier: Oban.Notifiers.PG,
  peer: Oban.Peers.Postgres,
  plugins: [
    {Oban.Pro.Plugins.DynamicCron, timezone: "America/New_York", crontab: crontab},
    {Oban.Plugins.Reindexer, timezone: "America/New_York", schedule: "@weekly"},
    Oban.Pro.Plugins.DynamicLifeline,
    {Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, {7, :days}}}
  ],

Current Behavior

It looks like a degradation because downgrading Oban Pro to 1.2.2 fixes the issue.

Basically, almost all jobs get stuck in executing state and then become orphans. Some get completed but only single tasks. There is nothing relevant in logs. I've tried deleting everything from oban_* tables, but that does not fix the problem. I've tried running only a subset of queues with size set to 1, but that does not work either.

I know that this is not much but I'm not sure how to further debug this behavior. There are no issues in logs on boot or processing. Oban processes seem to be running fine (when checking in "web dashboard").

Expected Behavior

Processing jobs ¯_(ツ)_/¯.

sorentwo commented 5 months ago

You're seeing this for almost all jobs when running the latest Oban and Pro? There are large changes between Pro v1.3.2 and v1.3.5, so that's surprising.

Do you see have any error reports or exceptions that you can see? The only way I was able to reproduce this was by simulating repeated timeouts and database errors. After fixes in the latest Oban and Pro versions, no jobs were stuck despite timeouts or errors.

smt116 commented 5 months ago

You're seeing this for almost all jobs when running the latest Oban and Pro? There are large changes between Pro v1.3.2 and v1.3.5, so that's surprising.

I've just double-checked with:

Upgraded:
  oban 2.17.3 => 2.17.4
  oban_pro 1.3.2 => 1.3.5

Still same issue:

2024-02-19.19:21:27 localhost: smefju@[filtered]-dev# select state, count(*) from oban_jobs group by state;
   state   | count
-----------+-------
 executing |   192
(1 row)

Time: 0,779 ms

Do you see have any error reports or exceptions that you can see? The only way I was able to reproduce this was by simulating repeated timeouts and database errors. After fixes in the latest Oban and Pro versions, no jobs were stuck despite timeouts or errors.

No errors, exceptions, or suspicious log entries. Only a single error from one job - this one is expected in development environment of that application.

sorentwo commented 5 months ago

Are those jobs all in the same queue? How is that queue configured? Which type of jobs are they, e.g. Batch, Chunk, plain worker? Should the jobs all be succeeding? Do they have timeouts?

Can you provide an isolated reproduction?

smt116 commented 5 months ago

Are those jobs all in the same queue? How is that queue configured? Which type of jobs are they, e.g. Batch, Chunk, plain worker? Should the jobs all be succeeding? Do they have timeouts?

21 different queues with concurrency between 1 and 4. The issue is still there if I keep running only one queue with size one (by pausing the rest). All jobs should succeed. Workers are Oban.Worker or Oban.Pro.Worker. There are no timeouts.

I haven't tried to reproduce it on fresh repo, but it might be somehow related to host context. Our sandbox and production is running on Linux and the problem is not there. Other developers use Linux too and they said they can't reproduce the problem. It might be something because of MacOS (M1 Pro) with PostgreSQL via Docker Desktop.

sorentwo commented 5 months ago

It might be something because of MacOS (M1 Pro) with PostgreSQL via Docker for Desktop.

That's interesting. On MacOS (M3 Pro) with PostgreSQL I'm unable to reproduce it, but I haven't tried running the application or database in Docker.

Will you try setting ack_async: false to see if it makes any difference for you?

smt116 commented 5 months ago

Will you try setting ack_async: false to see if it makes any difference for you?

Same issue after adding it to each queue.

sorentwo commented 5 months ago

Ok. And just to confirm, is this happening for all jobs, or some percentage?

smt116 commented 5 months ago

I would say that 98% get stuck. There are some that are processed but I don't see any repetitive scheme in the behavior.

sorentwo commented 5 months ago

I would say that 98% get stuck. There are some that are processed but I don't see any repetitive scheme in the behavior.

There's certainly something strange going on here, and it seems to be contextual/environmental. If you can provide a reproduction (even a docker compose), then I can try to recreate the issue and diagnose it.

sorentwo commented 4 months ago

@smt116 Has anything changed or are you able to provide some docker compose/config hints?

smt116 commented 4 months ago

Not yet. I will try to reproduce it with new app today and will let you know later.

smt116 commented 4 months ago

I can't reproduce it on fresh mix new example --sup with app running on host and same PostgreSQL server that is used for app where the issue occurs. It has to be something specific to that application but I have no idea how to debug it since Oban Pro 1.2.2 works fine and 1.3.5 does not 🤔.

diff --git a/config/config.exs b/config/config.exs
new file mode 100644
index 0000000..98f165c
--- /dev/null
+++ b/config/config.exs
@@ -0,0 +1,24 @@
+import Config
+
+config :example, Example.Repo,
+  username: "postgres",
+  password: "postgres",
+  database: "example_repo",
+  port: 5432,
+  hostname: "localhost"
+
+
+config :example, ecto_repos: [Example.Repo]
+
+config :example, Oban,
+  repo: Example.Repo,
+  engine: Oban.Pro.Engines.Smart,
+  notifier: Oban.Notifiers.PG,
+  peer: Oban.Peers.Postgres,
+  plugins: [
+    {Oban.Pro.Plugins.DynamicCron, crontab: [{"* * * * *", Example.Worker}]},
+    {Oban.Plugins.Reindexer, schedule: "@weekly"},
+    Oban.Pro.Plugins.DynamicLifeline,
+    {Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, {7, :days}}}
+  ],
+  queues: [default: 10]
diff --git a/lib/example/application.ex b/lib/example/application.ex
index f3a78eb..86a3d63 100644
--- a/lib/example/application.ex
+++ b/lib/example/application.ex
@@ -8,6 +8,8 @@ defmodule Example.Application do
   @impl true
   def start(_type, _args) do
     children = [
+      Example.Repo,
+      {Oban, Application.fetch_env!(:example, Oban)}
       # Starts a worker by calling: Example.Worker.start_link(arg)
       # {Example.Worker, arg}
     ]
diff --git a/lib/example/repo.ex b/lib/example/repo.ex
new file mode 100644
index 0000000..8aa05c9
--- /dev/null
+++ b/lib/example/repo.ex
@@ -0,0 +1,5 @@
+defmodule Example.Repo do
+  use Ecto.Repo,
+    otp_app: :example,
+    adapter: Ecto.Adapters.Postgres
+end
diff --git a/lib/example/worker.ex b/lib/example/worker.ex
new file mode 100644
index 0000000..0e54322
--- /dev/null
+++ b/lib/example/worker.ex
@@ -0,0 +1,10 @@
+defmodule Example.Worker do
+  use Oban.Worker
+
+  require Logger
+
+  @impl Oban.Worker
+  def perform(_job) do
+    Logger.debug("OK")
+  end
+end
diff --git a/mix.exs b/mix.exs
index 72676c2..fcec995 100644
--- a/mix.exs
+++ b/mix.exs
@@ -22,8 +22,11 @@ defmodule Example.MixProject do
   # Run "mix help deps" to learn about dependencies.
   defp deps do
     [
-      # {:dep_from_hexpm, "~> 0.3.0"},
-      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
+      {:oban_pro, "~> 1.3", repo: "oban"},
+      {:oban_web, "~> 2.10.0-rc.2", repo: "oban"},
+      {:oban, "~> 2.17.1"},
+      {:ecto_sql, "~> 3.0"},
+      {:postgrex, ">= 0.0.0"}
     ]
   end
 end
diff --git a/mix.lock b/mix.lock
new file mode 100644
index 0000000..dea1687
--- /dev/null
+++ b/mix.lock
@@ -0,0 +1,24 @@
+%{
+  "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
+  "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
+  "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
+  "ecto": {:hex, :ecto, "3.11.1", "4b4972b717e7ca83d30121b12998f5fcdc62ba0ed4f20fd390f16f3270d85c3e", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ebd3d3772cd0dfcd8d772659e41ed527c28b2a8bde4b00fe03e0463da0f1983b"},
+  "ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"},
+  "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
+  "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
+  "oban": {:hex, :oban, "2.17.4", "3ebe79dc0cad16f23e5feea418f9bc5b07d453b8fb7caf376d812be96157a5c5", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "71a804abea3bb7e104782a5b5337cbab76c1a56b9689a6d5159a3873c93898b6"},
+  "oban_met": {:hex, :oban_met, "0.1.4", "9a95fa8a435c5ee4e475d1d626a917c7764ac1c46f7efd98b41e319d010f2871", [:mix], [{:oban, "~> 2.15", [hex: :oban, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "oban", "b9579fdb17166da925bf69dd1b848eb675d0920477758a4565e6a34974334aea"},
+  "oban_pro": {:hex, :oban_pro, "1.3.5", "b4d2fd01b379964e7823a6e4a4130991258cf9df011b932b0ba4fc7eeaf96587", [:mix], [{:ecto_sql, "~> 3.8", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.13", [hex: :libgraph, repo: "hexpm", optional: true]}, {:oban, "~> 2.17.3", [hex: :oban, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}], "oban", "43c6679af3f9ac8d56806332f7846f476da94fb72e6e60d97ea44c0c4166bd21"},
+  "oban_web": {:hex, :oban_web, "2.10.2", "95414ef8a0d34fe9b536bc32e85c735ab2dbcd85ca2354843a3270df7ff891e2", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, "~> 2.15", [hex: :oban, repo: "hexpm", optional: false]}, {:oban_met, "~> 0.1.2", [hex: :oban_met, repo: "oban", optional: false]}, {:phoenix, "~> 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.17", [hex: :postgrex, repo: "hexpm", optional: false]}], "oban", "e6a41b9c9cbb1966c2139fb9c538de4b3a917e03fb539d3e46f14bbead3149cf"},
+  "phoenix": {:hex, :phoenix, "1.7.11", "1d88fc6b05ab0c735b250932c4e6e33bfa1c186f76dcf623d8dd52f07d6379c7", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex
: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "b1ec57f2e40316b306708fe59b92a16b9f6f4bf50ccfa41aa8c7feb79e0ec02a"},
+  "phoenix_html": {:hex, :phoenix_html, "4.0.0", "4857ec2edaccd0934a923c2b0ba526c44a173c86b847e8db725172e9e51d11d6", [:mix], [], "hexpm", "cee794a052f243291d92fa3ccabcb4c29bb8d236f655fb03bcbdc3a8214b8d13"},
+  "phoenix_live_view": {:hex, :phoenix_live_view, "0.20.9", "46d5d436d3f8ff97f066b6c45528fd842a711fd3875b2d3f706b2e769ea07c51", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "694388615ece21b70c523910cba1c633132b08a270caaf60100dd4eaf331885d"},
+  "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"},
+  "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"},
+  "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
+  "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
+  "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"},
+  "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
+  "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
+  "websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
+}
diff --git a/priv/repo/migrations/20240221174352_add_oban_jobs_table.exs b/priv/repo/migrations/20240221174352_add_oban_jobs_table.exs
new file mode 100644
index 0000000..242cbd3
--- /dev/null
+++ b/priv/repo/migrations/20240221174352_add_oban_jobs_table.exs
@@ -0,0 +1,7 @@
+defmodule Example.Repo.Migrations.AddObanJobsTable do
+  use Ecto.Migration
+
+  def up do
+    Oban.Migration.up(version: 12)
+  end
+end
diff --git a/priv/repo/migrations/20240221174805_add_oban_producers.exs b/priv/repo/migrations/20240221174805_add_oban_producers.exs
new file mode 100644
index 0000000..5df06db
--- /dev/null
+++ b/priv/repo/migrations/20240221174805_add_oban_producers.exs
@@ -0,0 +1,5 @@
+defmodule Example.Repo.Migrations.AddObanProducers do
+  use Ecto.Migration
+
+  defdelegate change, to: Oban.Pro.Migrations.Producers
+end
diff --git a/priv/repo/migrations/20240221175839_add_oban_crons.exs b/priv/repo/migrations/20240221175839_add_oban_crons.exs
new file mode 100644
index 0000000..1688d8f
--- /dev/null
+++ b/priv/repo/migrations/20240221175839_add_oban_crons.exs
@@ -0,0 +1,5 @@
+defmodule MyApp.Repo.Migrations.AddObanCron do
+  use Ecto.Migration
+
+  defdelegate change, to: Oban.Pro.Migrations.DynamicCron
+end
sorentwo commented 4 months ago

Are there any other repo options in the original app? Anything out of the ordinary?

smt116 commented 4 months ago

Are there any other repo options in the original app?

I'm afraid no.

Anything out of the ordinary?

It uses Ash framework.


I think I've found the issue. Jobs get stuck only if I enqueue all relevant tasks (~60-90 at once) with draining queues right after that. I was using that by default in development. This is why I haven't seen this in sandbox or production environments. Jobs are running fine if I skip draining queues and let the engine handle processing.

zachdaniel commented 4 months ago

In Ash we have two helpers for enqueueing jobs:

AshOban.Test.schedule_and_run_triggers

and

AshOban.schedule_and_run_triggers

AshOban.Test.schedule_and_run_triggers just calls AshOban.schedule_and_run_triggers with drain_queues?: true.

What AshOban.schedule_and_run_triggers ultimately does is creates a bunch of relevant jobs, and then, if drain_queues?: true, we go through each relevant queue (twice) and drain the queue.

  defp drain_queues(queues, opts) do
    if opts[:drain_queues?] do
      Enum.reduce(queues ++ queues, default_acc(), fn queue, acc ->
        [queue: queue]
        |> Keyword.merge(
          Keyword.take(opts, [
            :queue,
            :with_limit,
            :with_recursion,
            :with_safety,
            :with_scheduled
          ])
        )
        |> Oban.drain_queue()
        |> Map.put(:queues_not_drained, [])
        |> merge_results(acc)
      end)
    else
      default_acc()
      |> Map.update!(:queues_not_drained, &Enum.uniq(&1 ++ queues))
    end
  end
sorentwo commented 4 months ago

That would do it. Draining queues is intended for testing, not production. It's in Oban rather than Oban.Testing for legacy reasons.

Oban.drain_jobs/1 isn't aware of Smart engine options and doesn't disable async acking. However, Oban.Pro.Testing.drain_jobs/1 in Pro disables async acking because it's aware of the Smart engine. Either way, draining jobs from running queues could definitely cause problems like you're seeing.

zachdaniel commented 4 months ago

Okay, interesting. So AshOban has a configuration for whether or not the user has ObanPro. So my thought is that the drain_queues? option should behave a bit like this:

if oban_pro? do
  for queue <- queues do
    Oban.Pro.Testing.drain_jobs(queue)
  end
else
  if <queues are running> do
    raise ArgumentError, "Cannot drain running queues without using Oban Pro"
  else
    Oban.drain_jobs(queue)
  end
end

For <queues are running>, I should probably looking for config :otp_app, Oban, testing: :manual, right?

sorentwo commented 4 months ago

So my thought is that the drain_queues? option should behave a bit like this:

That's an improvement, for sure. The Pro version has better defaults.

For , I should probably looking for config :otp_app, Oban, testing: :manual, right?

That's fairly reliable, but not entirely, because it's possible to not run any queues in any mode, or run queues with DynamicQueues instead. The only surefire way is to query the registry for an instance:

match = [{{{conf.name, {:producer, :"$1"}}, :"$2", :_}, [], [{{:"$1", :"$2"}}]}]

Oban.Registry
|> Registry.select(match)
|> Enum.any?()

But I don't recommend that. We'll make a small tweak to the smart engine to work around draining when a queue is running, since there's nothing that officially says you can't do that.

zachdaniel commented 4 months ago

So the behavior I'm looking for is basically "wait until all of the jobs currently in the queue are done". I don't necessarily need the queue to be empty when it's over. Is there a better way to accomplish that?

sorentwo commented 4 months ago

Is there a better way to accomplish that?

Not really. Between scheduling, retries, and invisible jobs from other transactions there's no good way for you to know what all the jobs in the queue are, or a good way to check whether they're all done.

We'll make a small tweak to the smart engine to work around draining when a queue is running, since there's nothing that officially says you can't do that.

After a double check I remembered this isn't necessary. Pro's drain_jobs generates a unique name for the conf, there's no chance it will see actual running queues as alive.

I'm going to close this issue since there's nothing actually wrong with Oban.

@zachdaniel Feel free to reach out on Slack or the forum if you need any more help on this.