Open DohanKim opened 6 months ago
I'm looking into supabase realtime codes to get some hints but can't figure out for now
Even though it is happening in my prod app once a week, (DB connection closed temporarily -> DB connection recovered -> all processes start working again except WalEx event) I can't manage to reproduce it in the test or dev environment.
Currently closing the issue while migrating my app from v2.3.0 to the latest version of WalEx.
Will reopen the issue when it happens again in the latest version.
FYI
it works really well on the latest version.
Super! Thank you for your help on this.
I also pushed up a few other changes (mainly, supervisor strategies): https://github.com/cpursley/walex/commit/e2a2c0f575f665986f963aebb43e776897f830fa
But haven't cut a release for it yet.
It happened again today with the latest version. Again, the symptom is that after a short DB connection error, all other DB-related processes work again except WalEx. It's tough to debug the problem as there is no error from WalEx at all. I suspect that reconnecting or re-initiating to the WAL slot is not working properly.
@cpursley can you please give me some advice as this is a really critical issue on my prod service?
Hum, sorry to hear that. Can you try the last master (reference the github repo instead of last release)? Supervisors are set to restart more often. And maybe fork latest master and add some logging in various places?
will try that. thanks.
Happened today with the latest master branch.
Same symptom.
WAL slot connected successfully when I restart elixir application with :init.restart
@DohanKim Is there anything showing up in logs or your error reporting system (it sounds like no, but thought I'd ask). I'd like to help but not sure how to set up a scenario that reproduces the issue.
I am only getting this error from Ecto. (DBConnection.ConnectionError tcp recv: closed)
There are no other error messages, unfortunately.
I spent a couple of days reproducing the issue but was not successful.
The scenario I suspect is WalEx connects to a WAL slot -> DB restarts -> WalEx tries to reconnect to the WAL slot which is closed already
I will try more and share the results here.
Thank you for the update and sorry for the trouble.
Perhaps we could write some additional tests here to test the scenario of a restarting db: https://github.com/cpursley/walex/blob/master/test/walex/database_test.exs#L36
happened again with v3.8.0. now investigating 🥲
I wonder if we should instead of creating a slot with "walex_temp_slot_" <> Integer.to_string(:rand.uniform(9_999))
type of naming, we should create a more consistent name and try to connect to that or recreate if it does not exist?
https://github.com/cpursley/walex/blob/master/lib/walex/replication/server.ex#L41
@cpursley that would be a good idea to create a slot name with app_name. let me first write a test case reproducing the error.
@DohanKim could you create a PR with the experiment you are doing? That way I could also pull down and investigate. Thank you for you help on this!
@cpursley This is the test code I'm currently working on. You can just replace it with database_test.exs
defmodule WalEx.DatabaseTest do
use ExUnit.Case, async: false
import WalEx.Support.TestHelpers
alias WalEx.Supervisor, as: WalExSupervisor
require Logger
@hostname "localhost"
@username "postgres"
@password "postgres"
@database "todos_test"
@base_configs [
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events"
]
describe "logical replication" do
setup do
{:ok, database_pid} = start_database()
%{database_pid: database_pid}
end
test "should have logical replication set up", %{database_pid: pid} do
show_wall_level = "SHOW wal_level;"
assert is_pid(pid)
assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level)
end
test "should start replication slot", %{database_pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
assert is_pid(replication_pid)
pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"
assert [
%{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"}
| _replication_slots
] = query(database_pid, pg_replication_slots)
assert String.contains?(slot_name, "walex_temp_slot")
end
test "should re-initiate after DB reconnection", %{database_pid: database_pid} do
{:ok, supervisor_pid} = TestSupervisor.start_link()
database_pid =
Supervisor.which_children(supervisor_pid)
|> tap(&Logger.debug("Children" <> inspect(&1)))
|> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
|> elem(1)
|> tap(&Logger.debug("Database pid" <> inspect(&1)))
pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"
query(database_pid, pg_replication_slots)
|> tap(&Logger.debug("Replication slots" <> inspect(&1)))
name =
WalEx.Config.Registry.set_name(:set_gen_server, WalEx.Replication.Server, :todos)
|> tap(&Logger.debug("Server name" <> inspect(&1)))
replication_server_pid =
GenServer.whereis(name)
|> tap(&Logger.debug("Server pid" <> inspect(&1)))
# {output, exit_code} = System.cmd("sudo", ["service", "postgresql", "restart"])
Process.info(database_pid) |> tap(&Logger.debug("Database pid" <> inspect(&1)))
update_user(database_pid)
:timer.sleep(3000)
Supervisor.terminate_child(supervisor_pid, DBConnection.ConnectionPool)
|> tap(&Logger.debug("Terminated" <> inspect(&1)))
Process.info(database_pid) |> tap(&Logger.debug("Database pid" <> inspect(&1)))
Supervisor.which_children(supervisor_pid)
|> tap(&Logger.debug("Children" <> inspect(&1)))
# Process.exit(database_pid, :kill)
Logger.debug("waiting")
:timer.sleep(3000)
Logger.debug("done waiting")
Supervisor.restart_child(supervisor_pid, DBConnection.ConnectionPool)
|> tap(&Logger.debug("Restarted" <> inspect(&1)))
Supervisor.which_children(supervisor_pid)
|> tap(&Logger.debug("Children" <> inspect(&1)))
database_pid =
Supervisor.which_children(supervisor_pid)
|> tap(&Logger.debug("Children" <> inspect(&1)))
|> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
|> elem(1)
|> tap(&Logger.debug("Database pid" <> inspect(&1)))
update_user(database_pid)
database_pid =
Supervisor.which_children(supervisor_pid)
|> tap(&Logger.debug("Children" <> inspect(&1)))
|> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
|> elem(1)
query(database_pid, pg_replication_slots)
|> tap(&Logger.debug("Replication slots" <> inspect(&1)))
# assert [
# %{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"}
# | _replication_slots
# ] = query(database_pid, pg_replication_slots)
# assert String.contains?(slot_name, "walex_temp_slot")
end
end
def start_database do
Postgrex.start_link(
hostname: @hostname,
username: @username,
password: @password,
database: @database
)
end
def query(pid, query) do
pid
|> Postgrex.query!(query, [])
|> map_rows_to_columns()
end
def map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do
Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end)
end
def map_rows_to_columns(_result), do: []
end
defmodule TestSupervisor do
use Supervisor
@hostname "localhost"
@username "postgres"
@password "postgres"
@database "todos_test"
@base_configs [
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events",
destinations: [
modules: [TestModule]
]
]
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
children = [
{Postgrex,
[hostname: @hostname, username: @username, password: @password, database: @database]},
{WalEx.Supervisor, @base_configs}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule TestModule do
require Logger
use WalEx.Event, name: :test_app
on_event(
:all,
fn events -> Logger.info("on_event event occurred: #{inspect(events, pretty: true)}") end
)
on_update(
:user,
[],
fn events -> Logger.info("on_update event occurred: #{inspect(events, pretty: true)}") end
)
end
@DohanKim I took your idea and created a test branch here: https://github.com/cpursley/walex/pull/46
I also set slot name to the app name: https://github.com/cpursley/walex/pull/46/files#diff-f7aa5bafef0b9d259456d1b5344450f3ae79ce730a61d65d6e0cae665592ad4cR43
Please let me know what if this covers the situation you've been experiencing. Feel free to make your own changes. I want to be sure we cover all possible connection cases.
Thanks!
I added another test case that attempts to stop Postgres via command line. It seems to work on MacOS where Postgres was installed via Postgres.app. Postgres on MacOS via homebrew is also covered but untested.
Also, linux (Debian) is covered but I haven't tested locally. It does not work on the Github Workflow due to no sudo access (and that it's in docker and I don't believe can actually be started/stopped in the test runner).
I'm not sure what type of local machine you use, but I would appreciate you testing this and reporting back. Thanks!
@cpursley Thanks for the effort!
I'm using M1 Mac (Apple Silicon) locally (and Supabase on prod).
After adding some codes as homebrew installs Postgres in "/opt/homebrew/Cellar/postgresql"
, the tests working well.
But the test cases are still passing with the random slot names, meaning that the test cases are not covering the case my prod server is experiencing.
I'll try to write a test case covering my error case.
I've spent roughly a week attempting to replicate the issue, but haven't succeeded 🥲. I plan to switch the approach and implement thorough logging within the library. I could either tailor the logging only for my use or make it a pull request for the community. Which option would you prefer?
Feel free to put in logging! We can always remove later when the issue is resolved.
Also, could you submit a change to this branch with your homebrew related changes? But please modify so that the version number is dynamic instead of hard coded.
Hi @DohanKim ~ any thoughts on my previous comments?
@cpursley sorry for the late reply. I'm running late on my service upgrade 🥲.
Anyway, I had put a couple of loggings inside my app instead of WalEx and found some clues. After DB reconnection, the temporary replication slot WalEx is using is closed and a new one is not opened when DB recovers from the error.
I'll spend at most a couple of days reproducing the error again in the test code, and if it's not successful, again, I'll start putting logging and creating PR. Sounds okay?
Also, I'll submit PR with homebrew related codes.
@DohanKim Thanks for the changes. I went ahead and merged them to make it easer (so we can start new branches as needed).
It happens that sometimes the DB connection is closed. Even though other processes are restarted and reconnected shortly after the temporary connection issue is resolved, Walex just stopped working.
Can you give me some ideas and how to implement them? (ex: reconnecting after exponential backoff) @cpursley