cpursley / walex

Postgres change events (CDC) in Elixir
MIT License
276 stars 14 forks source link

Supervisor not reloading the listener #25

Closed Bogdan1001 closed 7 months ago

Bogdan1001 commented 8 months ago

hi, after an error occurs in the pipeline, the process becomes fully terminated and the supervisor does not restart it.

cpursley commented 8 months ago

Hi @Bogdan1001 - there might be some bugs in the latest version. Do you mind sharing what you're seeing in logs?

DohanKim commented 8 months ago

it happened to me today. on a long-running server (working correctly for about a week), the WalEx process suddenly stopped and hasn't restarted. I can't see any error in the logs. @cpursley can you help me get some hints on where to start investigating?

(using 2.3.0 version)

DohanKim commented 7 months ago

it seems that no logging or telemetry is working on the project. isn't it?

DohanKim commented 7 months ago

succeeded reproducing the situation. (latest 3.6.0) investigating the problem and finding solution.

defmodule WalEx.ErrorTest do
  use ExUnit.Case, async: false

  alias WalEx.Supervisor, as: WalExSupervisor

  require Logger

  @test_database "todos_test"

  describe "error handling" do
    test "error test" do
      {:ok, pid} =
        Postgrex.start_link(
          hostname: "localhost",
          username: "postgres",
          password: "postgres",
          database: @test_database
        )

      configs_1 = get_base_configs()
      assert {:ok, pid_1} = WalExSupervisor.start_link(configs_1)

      users = """
        INSERT INTO \"user\" (email, name, age)
        VALUES
          ('test.user@example.com', 'Test User', 28);
      """

      Postgrex.query!(pid, users, [])
      |> tap(&Logger.debug("Inserted user: #{inspect(&1, pretty: true)}"))

      pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

      assert [
               %{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"}
               | _replication_slots
             ] =
               query(pid, pg_replication_slots)
               |> tap(&Logger.debug("Replication slots: #{inspect(&1, pretty: true)}"))

      :timer.sleep(3000)

      delete_users = """
        DELETE FROM "user" WHERE email = 'test.user@example.com';
      """

      Postgrex.query!(pid, delete_users, [])

      :timer.sleep(3000)
    end
  end

  def query(pid, query) do
    pid
    |> Postgrex.query!(query, [])
    |> map_rows_to_columns()
  end

  def map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do
    Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end)
  end

  defp get_base_configs(keys \\ []) do
    configs = [
      name: :test_name,
      hostname: "localhost",
      username: "postgres",
      password: "postgres",
      database: @test_database,
      port: 5432,
      subscriptions: [:user, :todo],
      publication: ["events"],
      modules: [WalEx.TestModule]
    ]

    case keys do
      [] -> configs
      _keys -> Keyword.take(configs, keys)
    end
  end
end

defmodule WalEx.TestModule do
  use WalEx.Event, name: :test_name

  require Logger

  def process_all(transaction) do
    Logger.error("test module error raised")
    raise "test error"
  end
end
DohanKim commented 7 months ago

created a PR

https://github.com/cpursley/walex/pull/28

cpursley commented 7 months ago

Thanks @DohanKim ~ can you make another run through the tests. Failing for me.

cpursley commented 7 months ago

I need to figure out a strategy for testing with a database in the CI (for now, I skip database related tests in the CI).

cpursley commented 7 months ago

@DohanKim I set up CI to also run database tests.

Can you pull the recent master changes and create a new PR?

And also address this failing test:

1) test start_link/2 should start multiple supervision trees (WalEx.SupervisorTest)
     test/walex/supervisor_test.exs:19
DohanKim commented 7 months ago

will pull the recent master and check the test you mentioned. thank you for the fast merge.

DohanKim commented 7 months ago

@cpursley seems like wee need to decide among

I think the latter is better for the error resiliency. which one do you prefer?

cpursley commented 7 months ago

@DohanKim I like idea of one publisher per app name. So that a single WalEx app can subscribe to multiple databases.

cpursley commented 7 months ago

@DohanKim

Thoughts on how to test this? I'm trying to test when the module with event processing code is called. But does not actually seem to get called. I renamed your error test file to event_test (and plan to test the Event module in general).

https://github.com/cpursley/walex/pull/31/files#diff-a6220a12664372b9c45914e723d2a7934f7ae8052fa6ba1411c35f26de9bcc2fR75

cpursley commented 7 months ago

I merged #31 to master and plan some other potentially breaking changes before cutting new release.

DohanKim commented 7 months ago

Wow, I was implementing the similar in a much slower phase trying not to break your codes but you already finished that. Thanks. Checking if the events are received and processes are restarted looks enough for now.