electric-sql / electric

Sync little subsets of your Postgres data into local apps and services.
https://electric-sql.com
Apache License 2.0
6.11k stars 141 forks source link

Contrary to our assumption, LSN doesn't seem to grow monotonically over the lifetime of a replication connection #1548

Closed alco closed 2 weeks ago

alco commented 3 weeks ago

We see occasional test failures on CI caused by receiving a logical message with an LSN that is lower than the LSN of the previously received message:

16:06:30.626 [info] Loading 137 CA(s) from :otp store
Running ExUnit with seed: 747171, max_cases: 8

.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

  1) test ReplicationClient against real db returns data formatted according to display settings (Electric.Postgres.ReplicationClientTest)
Error:      test/electric/postgres/replication_client_test.exs:155
     ** (EXIT from #PID<0.1592.0>) an exception was raised:

          ** (FunctionClauseError) no function clause matching in Electric.Postgres.ReplicationClient.update_received_wal/4

          The following arguments were given to Electric.Postgres.ReplicationClient.update_received_wal/4:

              # 1
              :xlog_data

              # 2
              %Electric.Postgres.ReplicationClient.State{
                transaction_received: {Electric.Postgres.ReplicationClientTest, :test_transaction_received, [#PID<0.1592.0>]},
                publication_name: "test_electric_publication",
                try_creating_publication?: false,
                start_streaming?: true,
                slot_name: "test_electric_slot",
                display_settings: [],
                origin: "postgres",
                txn_collector: %Electric.Postgres.ReplicationClient.Collector{
                  transaction: nil, tx_op_index: nil, relations: %{}},
                step: :streaming,
                received_wal: 33192064,
                applied_wal: 0
              }

              # 3
              33191768

              # 4
              33191768

          Attempted function clauses (showing 3 out of 3):

              defp update_received_wal(_step, state, _, -0-)
              defp update_received_wal(_step, %{received_wal: wal} = state, _, -wal-)
              defp update_received_wal(_step, state, _, wal) when -wal > state.received_wal-

          stacktrace:
            (electric 0.3.1) lib/electric/postgres/replication_client.ex:225: Electric.Postgres.ReplicationClient.update_received_wal/4
            (electric 0.3.1) lib/electric/postgres/replication_client.ex:153: Electric.Postgres.ReplicationClient.handle_data/2
            (postgrex 0.19.0) lib/postgrex/replication_connection.ex:551: Postgrex.ReplicationConnection.handle/5
            (postgrex 0.19.0) lib/postgrex/replication_connection.ex:537: Postgrex.ReplicationConnection.handle_data/2
            (stdlib 6.0.1) gen_statem.erl:3115: :gen_statem.loop_state_callback/11
            (stdlib 6.0.1) proc_lib.erl:329: :proc_lib.init_p_do_apply/3

.......................................................
Finished in 6.5 seconds (4.4s async, 2.0s sync)
174 doctests, 351 tests, 1 failure

I've also seen it happen once during normal operation of the sync service:

16:18:08.625 [debug] XLogData: wal_start=320170440, wal_end=320170440

16:18:08.625 [error] :gen_statem #PID<0.943.0> terminating
** (FunctionClauseError) no function clause matching in Electric.Postgres.ReplicationClient.update_received_wal/4
    (electric 0.2.6) lib/electric/postgres/replication_client.ex:225: Electric.Postgres.ReplicationClient.update_received_wal(:xlog_data, %Electric.Postgres.ReplicationClient.State{transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, []}, publication_name: "electric_publication", try_creating_publication?: true, start_streaming?: false, slot_name: "electric_slot", display_settings: [], origin: "postgres", txn_collector: %Electric.Postgres.ReplicationClient.Collector{transaction: nil, tx_op_index: nil, relations: %{21469 => %Electric.Postgres.LogicalReplication.Messages.Relation{id: 21469, namespace: "public", name: "issue", replica_identity: :all_columns, columns: [%Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "id", type_oid: 2950, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "title", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "description", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "priority", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "status", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "modified", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "created", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "kanbanorder", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "username", type_oid: 25, type_modifier: -1}]}}}, step: :streaming, received_wal: 320372448, applied_wal: 320170440}, 320170440, 320170440)
    (electric 0.2.6) lib/electric/postgres/replication_client.ex:153: Electric.Postgres.ReplicationClient.handle_data/2
    (postgrex 0.19.0) lib/postgrex/replication_connection.ex:551: Postgrex.ReplicationConnection.handle/5
    (postgrex 0.19.0) lib/postgrex/replication_connection.ex:537: Postgrex.ReplicationConnection.handle_data/2
    (stdlib 6.0.1) gen_statem.erl:3115: :gen_statem.loop_state_callback/11
    (stdlib 6.0.1) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
Queue: [info: {:tcp, #Port<0.42>, <<100, 0, 0, 0, 50, 119, 0, 0, 0, 0, 19, 21, 105, 200, 0, 0, 0, 0, 19, 21, 105, 200, 0, 2, 194, 22, 130, 138, 207, 107, 66, 0, 0, 0, 0, 19, 25, 173, 8, 0, 2, 194, 22, 130, 138, 206, ...>>}]
Postponed: []
State: {:no_state, %Postgrex.ReplicationConnection{protocol: %Postgrex.Protocol{sock: {:gen_tcp, #Port<0.42>}, connection_id: 1373, connection_key: 859705860, peer: {{127, 0, 0, 1}, 54321}, types: {Postgrex.DefaultTypes, #Reference<0.1451127446.481427457.98467>}, null: nil, timeout: 15000, ping_timeout: 15000, parameters: #Reference<0.1451127446.481296385.100529>, queries: #Reference<0.1451127446.481427459.106953>, postgres: :idle, transactions: :naive, buffer: :active_once, disconnect_on_error_codes: [], scram: %{auth_message: "n=,r=EW7BwdjQSbZJUCpDkadZRmmK,r=EW7BwdjQSbZJUCpDkadZRmmK/5naQVd4gLwtYG1jov2TrDt6,s=yx30tGHreSSTMsoVW4niRA==,i=4096,c=biws,r=EW7BwdjQSbZJUCpDkadZRmmK/5naQVd4gLwtYG1jov2TrDt6", iterations: 4096, salt: <<203, 29, 244, 180, 97, 235, 121, 36, 147, 50, 202, 21, 91, 137, 226, 68>>}, disable_composite_types: false, messages: []}, state: {Electric.Postgres.ReplicationClient, %Electric.Postgres.ReplicationClient.State{transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, []}, publication_name: "electric_publication", try_creating_publication?: true, start_streaming?: false, slot_name: "electric_slot", display_settings: [], origin: "postgres", txn_collector: %Electric.Postgres.ReplicationClient.Collector{transaction: nil, tx_op_index: nil, relations: %{21469 => %Electric.Postgres.LogicalReplication.Messages.Relation{id: 21469, namespace: "public", name: "issue", replica_identity: :all_columns, columns: [%Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "id", type_oid: 2950, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "title", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "description", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "priority", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "status", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "modified", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "created", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "kanbanorder", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "username", type_oid: 25, type_modifier: -1}]}}}, step: :streaming, received_wal: 320372448, applied_wal: 320170440}}, auto_reconnect: false, reconnect_backoff: 500, streaming: 500}}
Callback mode: &Postgrex.ReplicationConnection.handle_event/4, state_enter: false

16:18:08.627 [error] GenServer Electric.ConnectionManager terminating
** (stop) {:replication_connection, {:function_clause, [{Electric.Postgres.ReplicationClient, :update_received_wal, [:xlog_data, %Electric.Postgres.ReplicationClient.State{transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, []}, publication_name: "electric_publication", try_creating_publication?: true, start_streaming?: false, slot_name: "electric_slot", display_settings: [], origin: "postgres", txn_collector: %Electric.Postgres.ReplicationClient.Collector{transaction: nil, tx_op_index: nil, relations: %{21469 => %Electric.Postgres.LogicalReplication.Messages.Relation{id: 21469, namespace: "public", name: "issue", replica_identity: :all_columns, columns: [%Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "id", type_oid: 2950, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "title", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "description", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "priority", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "status", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "modified", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "created", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "kanbanorder", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "username", type_oid: 25, type_modifier: -1}]}}}, step: :streaming, received_wal: 320372448, applied_wal: 320170440}, 320170440, 320170440], [file: ~c"lib/electric/postgres/replication_client.ex", line: 225]}, {Electric.Postgres.ReplicationClient, :handle_data, 2, [file: ~c"lib/electric/postgres/replication_client.ex", line: 153]}, {Postgrex.ReplicationConnection, :handle, 5, [file: ~c"lib/postgrex/replication_connection.ex", line: 551]}, {Postgrex.ReplicationConnection, :handle_data, 2, [file: ~c"lib/postgrex/replication_connection.ex", line: 537]}, {:gen_statem, :loop_state_callback, 11, [file: ~c"gen_statem.erl", line: 3115]}, {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 329]}]}}
Last message: {:EXIT, #PID<0.943.0>, {:function_clause, [{Electric.Postgres.ReplicationClient, :update_received_wal, [:xlog_data, %Electric.Postgres.ReplicationClient.State{transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, []}, publication_name: "electric_publication", try_creating_publication?: true, start_streaming?: false, slot_name: "electric_slot", display_settings: [], origin: "postgres", txn_collector: %Electric.Postgres.ReplicationClient.Collector{transaction: nil, tx_op_index: nil, relations: %{21469 => %Electric.Postgres.LogicalReplication.Messages.Relation{id: 21469, namespace: "public", name: "issue", replica_identity: :all_columns, columns: [%Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "id", type_oid: 2950, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "title", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "description", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "priority", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "status", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "modified", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "created", type_oid: 1184, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "kanbanorder", type_oid: 25, type_modifier: -1}, %Electric.Postgres.LogicalReplication.Messages.Relation.Column{flags: [:key], name: "username", type_oid: 25, type_modifier: -1}]}}}, step: :streaming, received_wal: 320372448, applied_wal: 320170440}, 320170440, 320170440], [file: ~c"lib/electric/postgres/replication_client.ex", line: 225]}, {Electric.Postgres.ReplicationClient, :handle_data, 2, [file: ~c"lib/electric/postgres/replication_client.ex", line: 153]}, {Postgrex.ReplicationConnection, :handle, 5, [file: ~c"lib/postgrex/replication_connection.ex", line: 551]}, {Postgrex.ReplicationConnection, :handle_data, 2, [file: ~c"lib/postgrex/replication_connection.ex", line: 537]}, {:gen_statem, :loop_state_callback, 11, [file: ~c"gen_statem.erl", line: 3115]}, {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 329]}]}}
State: %Electric.ConnectionManager.State{connection_opts: [socket_options: [], ssl: false, ipv6: false, hostname: "localhost", port: 54321, database: "electric", username: "postgres", password: "password", sslmode: :disable], replication_opts: [start_streaming?: false, publication_name: "electric_publication", try_creating_publication?: true, slot_name: "electric_slot", transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, []}], pool_opts: [name: Electric.DbPool, pool_size: 10, types: PgInterop.Postgrex.Types], replication_client_pid: #PID<0.943.0>, pool_pid: #PID<0.944.0>, backoff: {{:backoff, 1000, 10000, 1000, :normal, :undefined, :undefined}, nil}}

16:18:08.634 [debug] Found existing replication slot

16:18:08.665 [info] Starting replication from postgres

My attempts to reproduce it have been unsuccessful. Further investigation is needed to determine the conditions that lead to this failure.

kevin-dp commented 3 weeks ago

Could the bug be on our side?

Postgres official documentation explicitly states that LSNs grow monotonically:

WAL records are appended to the WAL files as each new record is written. The insert position is described by a Log Sequence Number (LSN) that is a byte offset into the WAL, increasing monotonically with each new record.

alco commented 3 weeks ago

@kevin-dp There's probably a better title for this issue but I couldn't come up with one.

The bug lies in the assumption we make about LSNs of successive logical messages we receive from Postgres. Postgres definitely writes to WAL using monotonically increasing LSNs by definition, but that doesn't contradict the possibility of it being more lax when streaming logical messages to a replica. For example, Relation and Type messages aren't read directly from the primary's WAL but are generated on the fly.

There's something in the way that we store the latest seen LSN and how Postgres resumes a previously interrupted replication stream from the replication slot that leads to the conflict described in this issue.

rkistner commented 3 weeks ago

Last time I checked, the Postgres documentation on this was not clear at all, and I ran into the same issue. The only real source of info for this is the Postgres source code.

In the original WAL, messages from concurrent transactions are interleaved. Then with logical replication, messages are re-ordered to group per transaction. This means that:

  1. Within a single transaction, LSNs are strictly increasing.
  2. Over multiple transactions, the commit LSNs (or end_lsn?) for the transactions are strictly increasing.
  3. Over multiple transactions, the LSNs of messages inside the transactions may be out of order.

You can reproduce this by writing to Postgres using multiple transactions concurrently.

You can see how we handle this in PowerSync here.

alco commented 3 weeks ago

@rkistner Brilliant insight and at the right time. Thanks!

I've rethought our approach to keeping track of processed LSNs and reporting them back to Postgres.