superfly / fly_postgres_elixir

Library for working with local read-replica postgres databases and performing writes through RPC calls to other nodes in the primary Fly.io region.
https://hex.pm/packages/fly_postgres
Apache License 2.0
104 stars 10 forks source link

Documentation request: Oban #18

Open dnsbty opened 2 years ago

dnsbty commented 2 years ago

Hey Mark! Awesome repo! I'm working on adding this to a project and it all seems pretty straightforward except for Oban.

The Problem

For those who may not be familiar, Oban allows you to do job processing with Postgres as its data store. Normally you configure Oban with something like this in your config/config.exs file:

config :my_app, Oban,
  repo: MyApp.Repo,
  plugins: [Oban.Plugins.Pruner],
  queues: [default: 10]

After installing fly_postgres_elixir, Oban fails to start with the following exception:

** (Mix) Could not start application my_app: MyApp.Application.start(:normal, []) returned an error: shutdown: failed to start child: Oban
    ** (EXIT) an exception was raised:
        ** (ArgumentError) expected :repo to be an Ecto.Repo, got: MyApp.Repo
            (oban 2.10.1) lib/oban/config.ex:193: Oban.Config.validate_opt!/1
            (elixir 1.12.3) lib/enum.ex:930: Enum."-each/2-lists^foreach/1-0-"/2
            (oban 2.10.1) lib/oban/config.ex:55: Oban.Config.new/1
            (oban 2.10.1) lib/oban.ex:151: Oban.start_link/1
            (stdlib 3.15.2) supervisor.erl:414: :supervisor.do_start_child_i/3
            (stdlib 3.15.2) supervisor.erl:400: :supervisor.do_start_child/2
            (stdlib 3.15.2) supervisor.erl:384: anonymous fn/3 in :supervisor.start_children/2
            (stdlib 3.15.2) supervisor.erl:1234: :supervisor.children_map/4
            (stdlib 3.15.2) supervisor.erl:350: :supervisor.init_children/2
            (stdlib 3.15.2) gen_server.erl:423: :gen_server.init_it/2
            (stdlib 3.15.2) gen_server.erl:390: :gen_server.init_it/6
            (stdlib 3.15.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

If I change the configuration to point to MyApp.Repo.Local then the application will start up, but if I understand correctly this will cause problems down the line. Oban jobs are inserted into the database using Oban.insert/2 which would then try to insert into the local database which may or may not be a read-only replica.

My Solution

I would love feedback on this solution as I'm sure there might be a better way to do it, but I updated my application module to look like this:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Fly.RPC, []},
      MyApp.Repo.Local,
      {Fly.Postgres.LSN.Tracker, repo: MyApp.Repo.Local},
      ...
      oban(),
      MyAppWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]

    children
    |> Enum.reject(&is_nil/1)
    |> Supervisor.start_link(opts)
  end
  ...
  defp oban do
    if Fly.is_primary?() do
      config = Application.fetch_env!(:my_app, Oban)
      {Oban, config}
    end
  end
end

Then I created a new Background module that uses a macro to wrap all the Oban functions so that they will all be called against the primary region every time.

defmodule MyApp.Background do
  @moduledoc """
  This module serves as a wrapper around Oban so that it will work properly
  with `Fly.Repo` instead of an `Ecto.Repo`.
  """

  for {func, arity} <- Oban.__info__(:functions), func not in [:child_spec, :init, :start_link] do
    args = Macro.generate_arguments(arity, __MODULE__)

    @doc """
    See documentation for Oban.#{func}/#{arity}
    """
    def unquote(func)(unquote_splicing(args)) do
      if Fly.is_primary?() do
        Oban.unquote(func)(unquote_splicing(args))
      else
        Fly.RPC.rpc_region(Fly.primary_region(), Oban, unquote(func), unquote(args))
      end
    end
  end
end

Am I overlooking a simpler solution? I was thinking about making a video about how to do this, but I was wondering if it might make sense to have it live in the official documentation to make it easier for others to find. I also considered making the Background module a supervisor and have it determine whether or not to start the Oban process so that everything would be contained within that one module. Then I think it could maybe make sense to release it as a separate library if that would have value to others. I wasn't sure if it made sense to do that considering the relative simplicity of it. But I would love to know others' thoughts.

dnsbty commented 2 years ago

Oh one more thing in case anyone else finds this: You will want to make sure anywhere you are calling use Oban.Testing, repo: MyApp.Repo is updated to use Oban.Testing, repo: MyApp.Repo.Local as well.

brainlid commented 2 years ago

Hey @dnsbty! Thanks for writing this up!

Previously, I talked with Parker Selbert about how to best work with Oban in a situation like this. We determined that Oban really assumes it's in close proximity to the writable primary database. So the jobs should only be run from the primary region. He said the queues could be configured to only run on the primary. Here's the approach I took:

https://github.com/fly-apps/hello_elixir/pull/5

Myself, I don't have a lot of personal Oban experience in production. So your extra steps of wrapping the Oban interface seems like a really good idea. I'd have to play with the wrapper a bit to test it out.

Could it be simplified like this?

    def unquote(func)(unquote_splicing(args)) do
      Fly.rpc_primary(Oban, unquote(func), unquote(args))
    end

Fly.RPC.rpc_region already executes it locally if it's in the primary region or forwards it on if it's not. Does that change still work for you?

If so, I'd love to document it with the library to help people out.

brainlid commented 2 years ago

I think your application setup is cleaner though. I like that.

fedeotaran commented 2 years ago

I had exactly the same error. In my case, our solution was to create another repo for Oban with the database URL for the primary region:

# config/runtime.exs

config :myapp, MyApp.Oban.Repo,
    url: System.fetch_env!("DATABASE_URL"),
    socket_options: [:inet6],
    pool_size: String.to_integer(System.get_env("POOL_SIZE", "10")),
    priv: "priv/repo",
    migration_lock: nil,
    queue_target: 5000
# config/config.exs

config :myapp, Oban,
  repo: MyApp.Oban.Repo, # pass the repo here
  queues: [
    default: 10,
    upload: 3,
    ...
  ],
  plugins: [
    Oban.Plugins.Pruner,
    {
      Oban.Plugins.Cron,
      crontab: [
        # Inserted at midnight every day
        ...
      ]
    }
  ]
# lib/myapp/application.ex

  defp oban_opts do
    if primary_region?() do
      Application.get_env(:myapp, Oban)
    else
      Application.get_env(:myapp, Oban)
      |> Keyword.put(:queues, false)
      |> Keyword.put(:plugins, false)
      |> Keyword.put(:notifier, MyApp.DummyNotifier) # also we have to add a custom notifier to avoid some queries.
    end
  end

 defp primary_region? do
    System.get_env("FLY_REGION") == System.get_env("PRIMARY_REGION")
  end

But this solution you guys show seems simpler! I'll try! :)

brainlid commented 2 years ago

@fedeotaran I hope there will be some recommended configs for Oban and fly_postgres coming soon!

BTW, you can use Fly.is_primary?() as a built-in helper to replace the primary_region? function above. It does the same thing, so you were right about that. :slightly_smiling_face:

brainlid commented 2 years ago

Just an update to report on current progress and to explain that this isn't forgotten.

UPDATE: @sorentwo worked with me to help figure out what was needed for oban support. The fly_postgres was updated following his recommendations. I think there may still be some config and documentation to add here, but it's become more simplified. I think the big change is to disable the oban process from running outside of the primary region.

I also believe a blog post is coming soon from Parker that may help as well.

sorentwo commented 2 years ago

The getoban.pro app runs in 7 regions, but only the primary has a Postgres instance. A few weeks ago (right around the time this post started) I experimented with a replication based Oban deployment. Here is what I discovered:

  1. Postgres based notifications don’t work. You must switch to a distributed erlang notifier like Oban.Notifiers.PG.
  2. Oban instances running non-primary regions shouldn’t run any plugins, which also prevents them from trying to take leadership of the cluster. The next Oban version has an explicit mechanism to disable leadership for this exact reason.
  3. Oban Web has issues in other regions because it expects some quick write/read cycles and may time out. It also requires the use of a stats plugin, which triggers leadership and gets into the situation in the previous point.

This is all to say that Oban should work perfectly fine with Fly Postgres if you adhere to the following criteria:

That setup will allow non-primary regions to insert and query jobs, but not execute them—which is exactly what you want.

The easiest way to accomplish this is to do a check in your runtime.exs and conditionally disable queues and plugins. That would look something like this (written on a phone, bear with me):

unless System.get_env("FLY_REGION") do
  System.put_env("FLY_REGION", "ord")
end

unless System.get_env("PRIMARY_REGION") do
  System.put_env("PRIMARY_REGION", "ord")
end

primary? = System.get_env("FLY_REGION") == System.get_env("PRIMARY_REGION")

if config_env() == :prod do
  unless primary? do
    config :my_app, Oban,
      queues: false,
      plugins: false
  end
 end

Hope the info is helpful!

benonymus commented 1 year ago

Hey,

I found a lot of useful information here, thank you!

I got Oban working with fly_postgres_elixir with a multi instance setup.

When I do a deployment I see a lot of logs like these:

 app[d11a6ec1] ord [info] ** (ArgumentError) no node found running in region "sin"

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban_pro 0.13.0) lib/oban/pro/queue/smart_engine.ex:70: anonymous fn/2 in Oban.Pro.Queue.SmartEngine.init/2

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (fly_postgres 0.3.1) lib/fly_postgres.ex:170: Fly.Postgres.rpc_and_wait/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban 2.14.1) lib/oban/engine.ex:282: anonymous fn/3 in Oban.Engine.with_span/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban_pro 0.13.0) lib/oban/pro/queue/smart_engine.ex:947: Oban.Pro.Queue.SmartEngine.with_retry/3

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban 2.14.1) lib/oban/queue/producer.ex:64: Oban.Queue.Producer.handle_continue/2

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (telemetry 1.2.1) /app/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] Last message: {:continue, {:start, [limit: 50, queue: "google_calendar_sync"]}}

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] ** (ArgumentError) no node found running in region "sin"

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] 09:37:15.131 [error] GenServer {Oban.Registry, {Oban, {:producer, "events"}}} terminating

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (fly_postgres 0.3.1) lib/fly_postgres.ex:170: Fly.Postgres.rpc_and_wait/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (fly_rpc 0.2.0) lib/fly_rpc.ex:110: Fly.RPC.rpc_region/5

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban_pro 0.13.0) lib/oban/pro/queue/smart_engine.ex:947: Oban.Pro.Queue.SmartEngine.with_retry/3

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban_pro 0.13.0) lib/oban/pro/queue/smart_engine.ex:70: anonymous fn/2 in Oban.Pro.Queue.SmartEngine.init/2

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (telemetry 1.2.1) /app/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban 2.14.1) lib/oban/engine.ex:282: anonymous fn/3 in Oban.Engine.with_span/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] (oban 2.14.1) lib/oban/queue/producer.ex:64: Oban.Queue.Producer.handle_continue/2

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] 09:37:15.186 [info] Discovered node :"staging-betafi@fdaa:0:5b60:a7b:80:b278:8af9:2" in region sin

2023-02-08T09:37:15.763 app[d11a6ec1] ord [info] Last message: {:continue, {:start, [limit: 50, queue: "events"]}}

2023-02-08T09:37:15.777 app[d11a6ec1] ord [info] 09:37:15.374 [info] Running BetafiWeb.Endpoint with cowboy 2.9.0 at :::8080 (http)

2023-02-08T09:37:15.777 app[d11a6ec1] ord [info] 09:37:15.749 [error] GenServer {Oban.Registry, {Oban, {:producer, "mailers"}}} terminating 

sin is my primary region and ord is the only non-primary for now.

Is this normal or expected? It lasts for about 30 seconds. Everything still seems to work.

My Oban setup follows this comment: https://github.com/superfly/fly_postgres_elixir/issues/18#issuecomment-1076388188

I made the changes that are outlined here to the beam_cookie: https://github.com/superfly/fly_postgres_elixir#prevent-temporary-outages-during-deployments and the changes to the backup regions Everything comes back as expected. Everything even oban seems to work during redeployment.

Thank you

benonymus commented 1 year ago

I am running into another issue now.

 2023-02-09T09:39:42.798 app[55c15931] maa [info] (fly_postgres 0.3.1) lib/fly_postgres.ex:187: Fly.Postgres.rpc_and_wait/4

2023-02-09T09:39:42.798 app[55c15931] maa [info] (oban 2.14.1) lib/oban/stager.ex:138: Oban.Stager.stage_scheduled/2

2023-02-09T09:39:42.798 app[55c15931] maa [info] (oban 2.14.1) lib/oban/stager.ex:115: anonymous fn/2 in Oban.Stager.check_leadership_and_stage/1

2023-02-09T09:39:42.798 app[55c15931] maa [info] (ecto_sql 3.9.2) lib/ecto/adapters/sql.ex:1203: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4

2023-02-09T09:39:42.798 app[55c15931] maa [info] (db_connection 2.4.3) lib/db_connection.ex:1611: DBConnection.run_transaction/4

2023-02-09T09:39:42.798 app[55c15931] maa [info] (oban 2.14.1) lib/oban/stager.ex:86: anonymous fn/2 in Oban.Stager.handle_info/2

2023-02-09T09:39:42.798 app[55c15931] maa [info] (telemetry 1.2.1) /app/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3

2023-02-09T09:39:42.798 app[55c15931] maa [info] (oban 2.14.1) lib/oban/stager.ex:85: Oban.Stager.handle_info/2

2023-02-09T09:39:42.798 app[55c15931] maa [info] Last message: :stage 

I see these logs on the non-primary instance constantly. The config of it is this:

[
  engine: Oban.Pro.Queue.SmartEngine,
  repo: xxxxx.Repo,
  notifier: Oban.Notifiers.PG,
  peer: Oban.Peers.Global,
  plugins: [],
  queues: false
]

Does Oban need the local repo? - it does not look like. I am wondering because it makes inserts etc.

Maybe the config needs to change @sorentwo after Oban.Stager is not a plugin anymore?

Is wrapping of the oban calls necessary? - It is not.

It seems like that during deploys, or crashes, the non-primary Oban instance becomes the leader and things go haywire. In the config of it even when it is the leader it has no plugins or queues.

sorentwo commented 1 year ago

@benonymus The config needs a little tweak now that stager isn't a plugin. Normally plugins: false would disable peering, but it's overridden because peer is set to Global. (That may be a bug).

This tweak to the non-primary configuration should do it:

unless primary? do
  config :my_app, Oban,
    queues: false,
    plugins: false,
    peer: false
end
benonymus commented 1 year ago

Hey, Thanks a lot! Just to clarify, we still need to use plugins: false? In the docs it says plugins: [] https://hexdocs.pm/oban/v2-11.html#check-configuration-for-multi-node-setups

sorentwo commented 1 year ago

Great point! That section describes how to retain job staging without running any plugins. In this case, you want to disable job staging and plugins so that non-primary nodes can only insert jobs.

benonymus commented 1 year ago

Hey @sorentwo,

If we have this config, the oban dashboard only works in the primary region. For example, have an app running in Singapore and US. If you connect to the instance in Singapore the dashboard works, if you connect to the one in the US it does not. (this can be circumvented by a vpn but it is inconvenient) This is problematic for globally distributed teams. Any idea how can we make the dashboard work from all regions?

EDIT: I changed the config a bit:

        queues: false,
        plugins: [Oban.Web.Plugins.Stats],
        peer: false

This seems to work, but I wonder if it can cause any problems.

Thanks

sorentwo commented 1 year ago

That updated config will work fine, and is what I would recommend. Some operations like bulk canceling won’t work because they require a db write, but you could disable operations in replica regions using a resolver: https://getoban.pro/docs/web/2.9.6/customizing.html#action-controls