oban-bg / oban

💎 Robust job processing in Elixir, backed by modern PostgreSQL and SQLite3
https://oban.pro
Apache License 2.0
3.35k stars 311 forks source link

Oban Pro unique job conflict after upgrading from v1.5.0.rc-1 to v1.5.0.rc-4 #1160

Closed lamxw2 closed 1 month ago

lamxw2 commented 1 month ago

Precheck

Environment

Current Behavior

After upgrading Oban Web from 1.5.0.rc-1 to 1.5.0.rc-4, I started getting a job conflict on an old test that passed on previous versions. The upgrade also upgraded our Oban version from 2.18.2 to 2.18.3. I have run the index migration as advised in the changelog for upgrading to 1.5.0.rc-3.

I have 2 almost identical worker configs (and similar tests between them) that differ in their args and meta, and other workers that check for uniqueness, and all of them pass except for this 1 worker.

These are integration tests.

This is the Worker config for the failing test:

use Oban.Pro.Worker,
    queue: :plp_extracts,
    unique: [
      fields: [:args, :worker],
      keys: [:category_id],
      states: [:available, :executing, :retryable]
    ],
    max_attempts: 1

Worker config for the nearly identical config that passes:

use Oban.Pro.Worker,
    queue: :plp_gtin_autogen,
    unique: [
      fields: [:args, :worker],
      keys: [:plp_extract_id],
      states: [:available, :executing, :retryable]
    ],
    max_attempts: 1

Code for the failing test and output:

test "can add the same job after completion", %{autogen_log_id: autogen_log_id} do
      DateTime.utc_now() |> IO.inspect(label: "failing test start of test")

      Queue.add_job(@plp_id_args, autogen_log_id, @meta)
      |> IO.inspect(label: "failing test 1st job")

      mock_plp_extract()

      # Check if autogen is completed
      stub_autogen_finished(self())

      assert %{success: 1} = Oban.drain_queue(queue: :plp_extracts, with_safety: false)

      DateTime.utc_now() |> IO.inspect(label: "failing test end of 1st job")

      assert {:ok, %Oban.Job{conflict?: false}} =
               Queue.add_job(@plp_id_args, autogen_log_id, @meta)
               |> IO.inspect(label: "failing test 2nd job")

      receive_autogen_finished()
      assert_receive(:final_batch_job)
    end

image image image

Code for the nearly identical passing test and output:

test "can add the same job after completion", %{
      autogen_log_id: autogen_log_id,
      plp_extract: plp_extract,
      listings_without_skus: listings_without_skus,
      pel_1: pel_1,
      pel_2: pel_2
    } do
      DateTime.utc_now() |> IO.inspect(label: "passing test start of test")

      Queue.add_job(%{"plp_extract_id" => plp_extract.id}, autogen_log_id, @meta)
      |> IO.inspect(label: "passing test 1st job")

      load_data(listings_without_skus)
      mock_generate_from_gtin()
      mock_generate_from_test_extract(pel_1, pel_2)

      # Check if autogen is completed
      stub_autogen_finished(self())

      assert %{success: 1} = Oban.drain_queue(queue: :plp_gtin_autogen, with_safety: false)
      DateTime.utc_now() |> IO.inspect(label: "passing test end of 1st job")

      assert {:ok, %Oban.Job{conflict?: false}} =
               Queue.add_job(%{"plp_extract_id" => plp_extract.id}, autogen_log_id, @meta)
               |> IO.inspect(label: "passing test 2nd job")

      receive_autogen_finished()
      assert_receive(:final_batch_job)
    end

image image

I added timestamps at the start of each test and after the 1st job completed to see if the time taken affected it, however the times seem pretty similar between both tests.

In the failing test, I tried adding Process.sleep(60000) between the 1st job completion and the 2nd job insert to see if the default period: 60 caused it, and the test passed. However, I don't really understand why the period: 60 causes that test to fail, but not for the other one.

Expected Behavior

No conflict on 2nd job insert because the 1st job completed.

sorentwo commented 1 month ago

The difference between rc.1 and rc.3+ comes from universally using index backed uniqueness, even with a non-infinite period.

Does the test fail reliably? Does it happen in isolation as well? I've roughly reproduced the test and it passes:

    test "can add the same job after completion" do
      defmodule SomeWorker do
        use Oban.Pro.Worker,
          unique: [
            fields: [:args, :worker],
            keys: [:xid],
            states: [:available, :executing, :retryable]
          ]

        @impl Oban.Pro.Worker
        def process(_job), do: :ok
      end

      name = start_supervised_oban!(testing: :manual)

      insert_job = fn ->
        %{xid: 123}
        |> SomeWorker.new()
        |> then(&Oban.insert(name, &1))
      end

      assert {:ok, %{conflict?: false}} = insert_job.()

      assert %{success: 1} = Oban.drain_queue(name, queue: :default, with_safety: false)

      assert {:ok, %{conflict?: false}} = insert_job.()
    end

General tips aside from fixing this particular issues:

  1. Consider removing the fields override and let it use the queue as well, unless you schedule that in multiple queues and want them to remain distinct.
  2. Be wary of max_attempts: 1, transient errors or an unexpected shutdowns happen, so be sure you need one attempt.
  3. Consider using drain_jobs/1 in your tests instead. It's Smart engine aware, has better defaults, can drain multiple queues, can return the actual jobs, etc.
lamxw2 commented 1 month ago

Does the test fail reliably? Does it happen in isolation as well? I've roughly reproduced the test and it passes:

Yes, it always fails, and it also fails on our github CI tests. Not really sure what is meant by "in isolation"? I haven't tested it outside of the test environment.

I appreciate the attempt to reproduce it. It is difficult to determine what the exact cause is since my other nearly identical config passes.

I will try the suggestions and report back if they resolve the issue.

lamxw2 commented 1 month ago

Hi, I've tried removing fields and replacing drain_queue with drain_jobs

use Oban.Pro.Worker,
    queue: :plp_extracts,
    unique: [
      # fields: [:args],
      keys: [:category_id],
      states: [:available, :executing, :retryable]
    ],
    max_attempts: 1

Added a refute_enqueued just to ensure that the job isnt queued image

The test is still failing (refute_enqueued passes), still unable to figure out why 🤔 image

I've tested both queues on staging environment and both queues don't have any issues adding jobs that were already completed, cancelled or discarded, so there isn't any conflict on staging.

As for max_attempts: 1, I think limiting it to 1 is necessary because some jobs may be quite resource-intensive, so we don't want to blindly retry it on any error. This worker was directly converted from the free Oban, so we haven't really tried exploring conditional retries yet (if it's possible).

sorentwo commented 1 month ago

The test is still failing (refute_enqueued passes), still unable to figure out why

Will you share the output of MyApp.Repo.all(Oban.Job) after that refute_enqueued passes?

The comments about fields, drain_jobs, and max_attempts were just tips and shouldn't have any bearing on your tests. Nice to see you applied some, and it's understandable that you'd want to keep max_attempts: 1 🙂

lamxw2 commented 1 month ago

Sure, here it is. The job for plp_gtin_autogen queue is there because when a plp_extract job completes, it adds the result to that 2nd queue.

false
Repo.all Oban.Job 3: [
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "oban_jobs">,
    id: 170,
    state: "available",
    queue: "plp_gtin_autogen",
    worker: "YStation.Autogen.Industry.Gtin.ObanWorker",
    args: %{"autogen_log_id" => 69, "plp_extract_id" => 1},
    meta: %{
      "gtin_batch_size" => 1,
      "market_id" => 1,
      "plp_batch_size" => 1,
      "uniq" => true,
      "uniq_bmp" => [1, 2, 3],
      "uniq_key" => "xwTVRl8BAXgrr1HvjmsNGC/EPBthJc9Y2qesBULQ8XU"
    },
    tags: [],
    errors: [],
    attempt: 0,
    attempted_by: nil,
    max_attempts: 1,
    priority: 0,
    attempted_at: nil,
    cancelled_at: nil,
    completed_at: nil,
    discarded_at: nil,
    inserted_at: ~U[2024-10-10 15:09:08.691819Z],
    scheduled_at: ~U[2024-10-10 15:09:08.691819Z],
    conf: nil,
    conflict?: false,
    replace: nil,
    unique: nil,
    unsaved_error: nil
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "oban_jobs">,
    id: 169,
    state: "completed",
    queue: "plp_extracts",
    worker: "YStation.Autogen.Industry.PlpExtract.ObanWorker",
    args: %{"autogen_log_id" => 69, "category_id" => 1, "plp_extract_id" => 1},
    meta: %{
      "gtin_batch_size" => 0,
      "market_id" => 1,
      "plp_batch_size" => 1,
      "uniq" => true,
      "uniq_bmp" => [1, 2, 3],
      "uniq_key" => "xwTVRl8BAXgrr1HvjmsNGC/EPBthJc9Y2qesBULQ8XU"
    },
    tags: [],
    errors: [],
    attempt: 1,
    attempted_by: <omitted>,
    max_attempts: 1,
    priority: 0,
    attempted_at: ~U[2024-10-10 15:09:08.818155Z],
    cancelled_at: nil,
    completed_at: ~U[2024-10-10 15:09:08.895286Z],
    discarded_at: nil,
    inserted_at: ~U[2024-10-10 15:09:08.691819Z],
    scheduled_at: ~U[2024-10-10 15:09:08.691819Z],
    conf: nil,
    conflict?: false,
    replace: nil,
    unique: nil,
    unsaved_error: nil
  }
]
sorentwo commented 1 month ago

Those two jobs have the same unique key for different queues. Was that ran with fields: [:worker, :args]? If so, that would do it. You need the :queue included to generate a different unique value.

lamxw2 commented 1 month ago

I realized I didn't remove it from the 2nd worker, so I just did and reset my test db. Unfortunately, the conflict is still happening.

This is the Repo.all(Oban.Jobs) output

[
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "oban_jobs">,
    id: 147,
    state: "available",
    queue: "plp_gtin_autogen",
    worker: "YStation.Autogen.Industry.Gtin.ObanWorker",
    args: %{"autogen_log_id" => 58, "plp_extract_id" => 1},
    meta: %{
      "gtin_batch_size" => 1,
      "market_id" => 1,
      "plp_batch_size" => 1,
      "uniq" => true,
      "uniq_bmp" => [1, 2, 3],
      "uniq_key" => "VGp/bnqW/1wfoaRTiTjZ1WdLbfTvQf3IT6pKdeGUVEY"
    },
    tags: [],
    errors: [],
    attempt: 0,
    attempted_by: nil,
    max_attempts: 1,
    priority: 0,
    attempted_at: nil,
    cancelled_at: nil,
    completed_at: nil,
    discarded_at: nil,
    inserted_at: ~U[2024-10-10 15:22:57.875477Z],
    scheduled_at: ~U[2024-10-10 15:22:57.875477Z],
    conf: nil,
    conflict?: false,
    replace: nil,
    unique: nil,
    unsaved_error: nil
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "oban_jobs">,
    id: 146,
    state: "completed",
    queue: "plp_extracts",
    worker: "YStation.Autogen.Industry.PlpExtract.ObanWorker",
    args: %{"autogen_log_id" => 58, "category_id" => 1, "plp_extract_id" => 1},
    meta: %{
      "gtin_batch_size" => 0,
      "market_id" => 1,
      "plp_batch_size" => 1,
      "uniq" => true,
      "uniq_bmp" => [1, 2, 3],
      "uniq_key" => "VGp/bnqW/1wfoaRTiTjZ1WdLbfTvQf3IT6pKdeGUVEY"
    },
    tags: [],
    errors: [],
    attempt: 1,
    attempted_by: <omitted>,
    max_attempts: 1,
    priority: 0,
    attempted_at: ~U[2024-10-10 15:22:57.882466Z],
    cancelled_at: nil,
    completed_at: ~U[2024-10-10 15:22:57.904260Z],
    discarded_at: nil,
    inserted_at: ~U[2024-10-10 15:22:57.875477Z],
    scheduled_at: ~U[2024-10-10 15:22:57.875477Z],
    conf: nil,
    conflict?: false,
    replace: nil,
    unique: nil,
    unsaved_error: nil
  }
]

Test output image

sorentwo commented 1 month ago

How are you enqueuing the second job? Is it copying the meta from the first job? Those two jobs should have completely different unique keys:

iex(6)> key_1 =
...(6)>   (%{"autogen_log_id" => 58, "plp_extract_id" => 1}
...(6)>   |> Oban.Job.new(queue: "plp_gtin_autogen", unique: unique, worker: "YStation.Autogen.Industry.Gtin.ObanWorker")
...(6)>   |> Oban.Pro.Unique.gen_key())
"O/SfOvK99RrwVlRgOvXPBMi3uKioR9K8vWM7dYir1yQ"

iex(7)> key_2 =
...(7)>   (%{"autogen_log_id" => 58, "category_id" => 1, "plp_extract_id" => 1}
...(7)>   |> Oban.Job.new(queue: "plp_extracts", unique: unique, worker: "YStation.Autogen.Industry.PlpExtract.ObanWorker")
...(7)>   |> Oban.Pro.Unique.gen_key())
"glWTeZEbbLT44N6refCeWT2nymF4LmP/4KlrCWoCkwE"
lamxw2 commented 1 month ago

Hi @sorentwo , that's exactly it! I changed it to Map.take only the necessary keys and re-ran the test and it passes now. Has this unique key always existed, or is it something new from upgrading to Pro?

I'll have to take a look at all the other Workers to ensure the same thing doesn't happen. That's also something for me to take note of in the future. Thanks for all the help!

sorentwo commented 1 month ago

Has this unique key always existed, or is it something new from upgrading to Pro?

It has existed for a few versions, but the index wasn't in place before. The new behavior is more accurate and consistent between transactions.

Glad we figured it out! Closing this out as completed.