electric-sql / electric

Sync little subsets of your Postgres data into local apps and services.
https://electric-sql.com
Apache License 2.0
6.45k stars 156 forks source link

Delay reconnection attempts when the replication connection is closed with the "admin_shutdown" reason #957

Closed alco closed 3 months ago

alco commented 9 months ago

@justindotpub has shared their experience of running Electric alongside a Phoenix app on Discord.

One pain point from it that we could improve is to let the Phoenix app drop and recreate the database while Electric keeps running in the background. For that, Electric must close its replication connection to free the database up to being dropped. This can be partially achieved by issuing a pg_terminate_backend() command to drop the Postgres' end of Electric's replication connection:

SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = 'hello_electric_dev'
  AND pid <> pg_backend_pid();

However, Electric reconnects before Ecto has time to drop the database.

I'm proposing that we delay Electric's reconnection attempt if it sees that the connection was closed on purpose. Here's what the closure looks like on Electric's side:

iex(11)> 07:12:19.594 pid=<0.1009.0> origin=postgres_1 [debug] Electric.Replication.Postgres.Client.connect(%{database: ~c"hello_electric_dev", host: ~c"localhost", ip_addr: ~c"::1", ipv6: true, nulls: [nil, :null, :undefined], password: ~c"******", port: 5432, ssl: true, ssl_opts: [server_name_indication: ~c"localhost"], tcp_opts: [:inet6], timeout: 5000, username: ~c"electric"})
07:12:19.599 pid=<0.1008.0> [error] GenServer #PID<0.1008.0> terminating
** (stop) {:error, {:error, :fatal, "57P01", :admin_shutdown, "terminating connection due to administrator command", [file: "postgres.c", line: "3206", routine: "ProcessInterrupts", severity: "FATAL"]}}
Last message: {:tcp, #Port<0.24>, <<69, 0, 0, 0, 116, 83, 70, 65, 84, 65, 76, 0, 86, 70, 65, 84, 65, 76, 0, 67, 53, 55, 80, 48, 49, 0, 77, 116, 101, 114, 109, 105, 110, 97, 116, 105, 110, 103, 32, 99, 111, 110, 110, 101, 99, 116, 105, ...>>}
State: {:state, :gen_tcp, #Port<0.24>, "", {4471, -1328951954}, :on_replication, {:codec, %{nulls: [nil, :null, :undefined]}, [nil, :null, :undefined], {:oid_db, %{869 => {:type, 869, :inet, false, :undefined, :epgsql_codec_net, []}, 1001 => {:type, 1001, :bytea, true, 17, :epgsql_codec_text, []}, 23 => {:type, 23, :int4, false, :undefined, :epgsql_codec_integer, []}, 775 => {:type, 775, :macaddr8, true, 774, :epgsql_codec_net, []}, 3926 => {:type, 3926, :int8range, false, :undefined, :epgsql_codec_intrange, []}, 1040 => {:type, 1040, :macaddr, true, 829, :epgsql_codec_net, []}, 1186 => {:type, 1186, :interval, false, :undefined, :epgsql_codec_datetime, :epgsql_idatetime}, 774 => {:type, 774, :macaddr8, false, :undefined, :epgsql_codec_net, []}, 1187 => {:type, 1187, :interval, true, 1186, :epgsql_codec_datetime, :epgsql_idatetime}, 829 => {:type, 829, :macaddr, false, :undefined, :epgsql_codec_net, []}, 3802 => {:type, 3802, :jsonb, false, :undefined, :epgsql_codec_json, []}, 1015 => {:type, 1015, :varchar, true, 1043, :epgsql_codec_text, []}, 3905 => {:type, 3905, :int4range, true, 3904, :epgsql_codec_intrange, []}, 701 => {:type, 701, :float8, false, :undefined, :epgsql_codec_float, []}, 1000 => {:type, 1000, :bool, true, 16, :epgsql_codec_boolean, []}, 1009 => {:type, 1009, :text, true, 25, :epgsql_codec_text, []}, 25 => {:type, 25, :text, false, :undefined, :epgsql_codec_text, []}, 199 => {:type, 199, :json, true, 114, :epgsql_codec_json, []}, 1014 => {:type, 1014, :bpchar, true, 1042, :epgsql_codec_bpchar, []}, 1043 => {:type, 1043, :varchar, false, :undefined, :epgsql_codec_text, []}, 3911 => {:type, 3911, :tstzrange, true, 3910, :epgsql_codec_timerange, :epgsql_idatetime}, 2950 => {:type, 2950, :uuid, false, :undefined, :epgsql_codec_uuid, []}, 2951 => {:type, 2951, :uuid, true, 2950, :epgsql_codec_uuid, []}, 1083 => {:type, 1083, :time, false, :undefined, :epgsql_codec_datetime, :epgsql_idatetime}, 1007 => {:type, 1007, :int4, true, 23, :epgsql_codec_integer, []}, 20 => {:type, 20, :int8, false, :undefined, :epgsql_codec_integer, []}, 1183 => {:type, 1183, :time, true, 1083, :epgsql_codec_datetime, :epgsql_idatetime}, 3913 => {:type, 3913, :daterange, true, 3912, :epgsql_codec_timerange, :epgsql_idatetime}, 1082 => {:type, 1082, :date, false, :undefined, :epgsql_codec_datetime, :epgsql_idatetime}, 3807 => {:type, 3807, :jsonb, true, 3802, :epgsql_codec_json, []}, 3927 => {:type, 3927, :int8range, true, 3926, :epgsql_codec_intrange, ...}, 3910 => {:type, 3910, :tstzrange, false, :undefined, ...}, 1002 => {:type, 1002, :char, true, ...}, 17 => {:type, 17, :bytea, ...}, 1005 => {:type, 1005, ...}, 3908 => {:type, ...}, 1182 => {...}, ...}, %{{:float4, false} => 700, {:text, false} => 25, {:char, true} => 1002, {:time, false} => 1083, {:timetz, false} => 1266, {:timestamptz, true} => 1185, {:bpchar, true} => 1014, {:cidr, true} => 651, {:jsonb, false} => 3802, {:tsrange, true} => 3909, {:inet, false} => 869, {:float8, true} => 1022, {:bool, true} => 1000, {:timestamp, false} => 1114, {:int4, false} => 23, {:macaddr, true} => 1040, {:macaddr8, true} => 775, {:bpchar, false} => 1042, {:int4range, true} => 3905, {:varchar, true} => 1015, {:date, true} => 1182, {:uuid, false} => 2950, {:interval, false} => 1186, {:int4range, false} => 3904, {:point, true} => 1017, {:tsrange, false} => 3908, {:int8range, true} => 3927, {:tstzrange, true} => 3911, {:interval, true} => 1187, {:bytea, false} => 17, {:int2, true} => 1005, {:macaddr, false} => 829, {:text, true} => 1009, {:int8, true} => 1016, {:int8, ...} => 20, {...} => 1021, ...}}}, {[], []}, :undefined, :undefined, :undefined, :undefined, [{"application_name", ""}, {"client_encoding", "UTF8"}, {"DateStyle", "ISO, DMY"}, {"default_transaction_read_only", "off"}, {"in_hot_standby", "off"}, {"integer_datetimes", "on"}, {"IntervalStyle", "postgres"}, {"is_superuser", "on"}, {"server_encoding", "UTF8"}, {"server_version", "14.9 (Homebrew)"}, {"session_authorization", "electric"}, {"standard_conforming_strings", "on"}, {"TimeZone", "UTC"}], :information_redacted, [], :undefined, 73, :undefined, {:repl, 15722849968, 15722849912, 15722849912, false, :undefined, :undefined, #PID<0.1007.0>, false}, %{database: ~c"hello_electric_dev", host: {0, 0, 0, 0, 0, 0, 0, 1}, nulls: [nil, :null, :undefined], port: 5432, replication: ~c"database", ssl: true, ssl_opts: [server_name_indication: ~c"localhost"], tcp_opts: [:inet6], timeout: 5000}}
07:12:19.601 pid=<0.684.0> pg_producer=postgres_1 [warning] producer is down: {:via, :gproc, {:n, :l, {Electric.Replication.Postgres.LogicalReplicationProducer, "postgres_1"}}}
07:12:19.604 pid=<0.1009.0> origin=postgres_1 [debug] Electric.Replication.Postgres.Writer started, registered as {:via, :gproc, {:n, :l, {Electric.Replication.Postgres.Writer, "postgres_1"}}}, subscribed to {:n, :l, {Electric.Replication.SatelliteCollectorProducer, :default}}
07:12:19.604 pid=<0.680.0> [debug] Subscription request to satellite collector producer from consumer with [cancel: :temporary, min_demand: 10, max_demand: 50]
07:12:19.604 pid=<0.1011.0> [debug] Elixir.Electric.Replication.Postgres.LogicalReplicationProducer.init: publication: 'electric_publication', slot: 'electric_replication_out_hello_electric_dev'
07:12:19.604 pid=<0.1011.0> [info] Starting replication from postgres_1
07:12:19.604 pid=<0.684.0> pg_producer=postgres_1 [debug] request subscription to #PID<0.1011.0>
07:12:19.604 pid=<0.1011.0> [info] Electric.Replication.Postgres.LogicalReplicationProducer.init(%{database: ~c"hello_electric_dev", host: ~c"localhost", ip_addr: ~c"::1", ipv6: true, nulls: [nil, :null, :undefined], password: ~c"******", port: 5432, replication: ~c"database", ssl: true, ssl_opts: [server_name_indication: ~c"localhost"], tcp_opts: [:inet6], timeout: 5000, username: ~c"electric"})
07:12:19.604 pid=<0.1011.0> [debug] Electric.Replication.Postgres.Client.connect(%{database: ~c"hello_electric_dev", host: ~c"localhost", ip_addr: ~c"::1", ipv6: true, nulls: [nil, :null, :undefined], password: ~c"******", port: 5432, replication: ~c"database", ssl: true, ssl_opts: [server_name_indication: ~c"localhost"], tcp_opts: [:inet6], timeout: 5000, username: ~c"electric"})
07:12:19.609 pid=<0.1011.0> [debug] Elixir.Electric.Replication.Postgres.Client: CREATE_REPLICATION_SLOT "electric_replication_out_hello_electric_dev" LOGICAL pgoutput NOEXPORT_SNAPSHOT
07:12:19.613 pid=<0.1011.0> [debug] Elixir.Electric.Replication.Postgres.Client start_replication: slot: 'electric_replication_out_hello_electric_dev', publication: 'electric_publication'
07:12:19.614 pid=<0.684.0> pg_producer=postgres_1 [debug] ---- Filtering %Electric.Replication.Changes.UpdatedRecord{relation: {"electric", "transaction_marker"}, old_record: %{}, record: %{"content" => "\"75a7f171-5f2c-4c80-9526-d9870f5d9644\"", "id" => "cluster_id"}, tags: [], changed_columns: MapSet.new(["content", "id"])}
07:12:19.614 pid=<0.687.0> component=CachedWal.EtsBacked origin=postgres_1 [debug] Acknowledging 3/A927A798
07:12:19.614 pid=<0.687.0> component=CachedWal.EtsBacked [debug] Saving transaction 4190674 at 3/A927A798 with changes []
linear[bot] commented 9 months ago

VAX-1669 Delay reconnection attempts when the replication connection is closed with the "admin_shutdown" reason

KyleAMathews commented 3 months ago

👋 we've been working the last month on a rebuild of the Electric server over at a temporary repo https://github.com/electric-sql/electric-next/

You can read more about why we made the decision at https://next.electric-sql.com/about

We're really excited about all the new possibilities the new server brings and we hope you'll check it out soon and give us your feedback.

We're now moving the temporary repo back here. As part of that migration we're closing all the old issues and PRs. We really appreciate you taking the time to investigate and report this issue!