neondatabase / neon

Neon: Serverless Postgres. We separated storage and compute to offer autoscaling, code-like database branching, and scale to zero.
https://neon.tech
Apache License 2.0
14.78k stars 429 forks source link

Logical Replication transactions applied multiple times #6977

Closed save-buffer closed 4 months ago

save-buffer commented 7 months ago

Steps to reproduce

Make Neon a subscriber to some workload that produces several inserts/sec on a table with a primary key. Then restart it. Replication will fail because of duplicate key insert.

Expected result

No duplicate key insert

Actual result

Duplicate key insert

Environment

Logs, links

Discussion: https://neondb.slack.com/archives/C04DGM6SMTM/p1708363190710839

It seems that pageserver isn't applying advancing replorigin, and so the compute's origin when it restarts is whatever was in the last checkpoint.

kelvich commented 7 months ago

To achieve exactly once semantics for incoming logical replication postgres stamps each commit record with a 10-byte tuple of (origin_id, origin_lsn), where origin_id is two byte unique identifier of the replication source and origin_lsn is 8-byte lsn on the source side. Postgres maintains in-memory mapping of origin_id => origin_lsn and writes it to disk on each checkpoint including a shutdown checkpoint. On a normal start postgres would just read this disk state. During start after a crash postgres will load last on-disk state and on WAL replay it will update in-memory values on each commit record to end up in a consistent state.

That way postgres always has knowledge of remote lsn of last applied record and in case of replication reconnect it can ask to continue replication from a proper point. For context: postgres replication is somewhat pull model -- it is on replica to reconnect and ask for data starting with some lsn.

How can we support it in Neon:

  1. There are already wal records for creating and dropping replication origins (xl_replorigin_set / xl_replorigin_drop). No need to generate aux file logical messages for repl. origins.
  2. We store all repl. origins in one more directory-like structure. Could be sorted array of [[origin_id, lsn],...] (denser) or hashmap as before.
  3. Commit records serve as update records for repl. origins. We maintain in-memory commit counter and each 1k commits we write out new state of repl. origins in case there were some changes.
  4. On basebackup:
    • read disk state of repl. origins
    • find last 1k of commit records (IIUC they are now attached as updates to CLOG and we somewhat simulate range scan?) and replay them to get latest origin_lsn values
    • include up to date files in basebackup archive
knizhnik commented 7 months ago

To achieve exactly once semantics for incoming logical replication postgres stamps each commit record with a 10-byte tuple of (origin_id, origin_lsn), where origin_id is two byte unique identifier of the replication source and origin_lsn is 8-byte lsn on the source side.

Formally speaking it is not precisely true. origin_id is not specified I commit record. It is special kind of block_id (XLR_BLOCK_ID_DATA_SHORT, XLR_BLOCK_ID_DATA_LONG,... XLR_BLOCK_ID_DATA_SHORT).

Commit record may include origin_lsn&origin_timestamp (16 bytes totally) if XACT_XINFO_HAS_HAS_ORIGIN but is set in xinfo. But it is not principle.

knizhnik commented 7 months ago

There are already wal records for creating and dropping replication origins (xl_replorigin_set / xl_replorigin_drop). No need to generate aux file logical messages for repl. origins.

Well, presence of WAL record doesn't exclude necessity to have key with which this WAL record will be associated. Please notice that our storage is key-value storage. For example now commit records are associated with CLOG pages. AUX_FILE is such key with which xl_replorigin_set / xl_replorigin_drop can be associated (right now them are not stored as WAL records but just as single-file entry, but in principle it is the same).

We store all repl. origins in one more directory-like structure. Could be sorted array of [[origin_id, lsn],...] (denser) or hashmap as before.

No problem with it. I have already implemented this part.

Commit records serve as update records for repl. origins. We maintain in-memory commit counter and each 1k commits we write out new state of repl. origins in case there were some changes.

Do you mean that we should add REPL_ORIGIN key and associate commit records with it? It means that we need to write commit in two places: CLOG and repl. origin.

On basebackup: read disk state of repl. origins find last 1k of commit records (IIUC they are now attached as updates to CLOG and we somewhat simulate range scan?) > and replay them to get latest origin_lsn values include up to date files in basebackup archiveConcerning your proposal (1-4).

There is no any efficient way to locate N last commits. Right now original commit records are not stored in KV storage at all. Walingest stores its own CLOG update records.

But I do not understand what do we need to load N last commits at all. If we introduce REPL_ORIGIN(id) key, then we can associate repl origin updates with this page. And we need to load just one value (preceding basebackup LSN).

The question is how to find all origins. In Postgres each slot has its own origin and we just iterate through all slots. PS knows nothing about replication slots. But it has AUX file with slot state. In principle it can get this files, parse it and so get array of origins. Alternatively we can use range scan to find all available REPL_ORIGIN(id) keys. Last approach seems to be less efficient but easier to implement and doesn't require PS to know format of replication slot state file.

hlinnaka commented 4 months ago

This was fixed by https://github.com/neondatabase/neon/pull/7099