jackc / pglogrepl

PostgreSQL logical replication library for Go.
MIT License
319 stars 59 forks source link

How to resume replication? #40

Open zhanghaiyang9999 opened 1 year ago

zhanghaiyang9999 commented 1 year ago

If I created a publication and replication slot, added and updated some table records. later I exited my program but the table records still be updated or added. How can I resume the replication from last time when my program exited? For example, I replicated 5 records while my program is running, after my program exited, added another 5 new records into postgresql tables, How can I continue to replicate the new table records from number 6 to 10 if my program run again? call pglogrepl.SendStandbyStatusUpdate method or other methods? @jackc Thanks!

diabloneo commented 1 year ago

Firstly, your program should send status update message to Postgres, like this:

    err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn,
        pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})

The lastWriteWALPos can be fetched from CommitMessage.CommitLSN:

// CommitMessage is a commit message.
type CommitMessage struct {
    baseMessage
    // Flags currently unused (must be 0).
    Flags uint8
    // CommitLSN is the LSN of the commit.
    CommitLSN LSN
    // TransactionEndLSN is the end LSN of the transaction.
    TransactionEndLSN LSN
    // CommitTime is the commit timestamp of the transaction
    CommitTime time.Time
}

Then, if your program records the last CommitLSN sent to the postgres, it can start replication with this as the restartLSN :

    err = pglogrepl.StartReplication(ctx, c.conn,
        c.slotName, c.restartLSN,
        pglogrepl.StartReplicationOptions{
            Mode: pglogrepl.LogicalReplication,
            PluginArgs: []string{
                "proto_version '1'",
                fmt.Sprintf("publication_names '%s'", c.pubName),
            },
        },
    )
SimpleSoft-2020 commented 1 year ago

Firstly, your program should send status update message to Postgres, like this:

  err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn,
      pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})

The lastWriteWALPos can be fetched from CommitMessage.CommitLSN:

// CommitMessage is a commit message.
type CommitMessage struct {
  baseMessage
  // Flags currently unused (must be 0).
  Flags uint8
  // CommitLSN is the LSN of the commit.
  CommitLSN LSN
  // TransactionEndLSN is the end LSN of the transaction.
  TransactionEndLSN LSN
  // CommitTime is the commit timestamp of the transaction
  CommitTime time.Time
}

Then, if your program records the last CommitLSN sent to the postgres, it can start replication with this as the restartLSN :

  err = pglogrepl.StartReplication(ctx, c.conn,
      c.slotName, c.restartLSN,
      pglogrepl.StartReplicationOptions{
          Mode: pglogrepl.LogicalReplication,
          PluginArgs: []string{
              "proto_version '1'",
              fmt.Sprintf("publication_names '%s'", c.pubName),
          },
      },
  )

Thanks @diabloneo !I will try this! thanks again!

zhanghaiyang9999 commented 1 year ago

Firstly, your program should send status update message to Postgres, like this:

  err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn,
      pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})

The lastWriteWALPos can be fetched from CommitMessage.CommitLSN:

// CommitMessage is a commit message.
type CommitMessage struct {
  baseMessage
  // Flags currently unused (must be 0).
  Flags uint8
  // CommitLSN is the LSN of the commit.
  CommitLSN LSN
  // TransactionEndLSN is the end LSN of the transaction.
  TransactionEndLSN LSN
  // CommitTime is the commit timestamp of the transaction
  CommitTime time.Time
}

Then, if your program records the last CommitLSN sent to the postgres, it can start replication with this as the restartLSN :

  err = pglogrepl.StartReplication(ctx, c.conn,
      c.slotName, c.restartLSN,
      pglogrepl.StartReplicationOptions{
          Mode: pglogrepl.LogicalReplication,
          PluginArgs: []string{
              "proto_version '1'",
              fmt.Sprintf("publication_names '%s'", c.pubName),
          },
      },
  )

hi @diabloneo , I want to use clientXLogPos as the restartLSN, clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))

but the clientXLogPos's value will become from bigger to smaller and then from smaller to bigger, like this:

clientXLogPos: 1/337E38ED clientXLogPos: 0/4C clientXLogPos: 0/181 clientXLogPos: 1/337EBA02

Can clientXLogPos's be the restartLSN for the resume replication case? thanks!

zhanghaiyang9999 commented 1 year ago

CommitMessage.CommitLSN

Hi @diabloneo If use the CommitMessage.CommitLSN as the restart point of replication ,then there is an issue here. For example, I updated some records(say 10000 records) in one commit(for example, the sql: update foo set a=123 where id > 0 and id <=10000), How can I resume the replication form the middle record such as id=5000?

RollerKing commented 1 year ago

why using CommitMessage.CommitLSN as lastWriteWALPos rather than xld.WALStart + pglogrepl.LSN(len(xld.WALData))? what is the different between these tow value?

ubombi commented 1 year ago

why using CommitMessage.CommitLSN as lastWriteWALPos rather than xld.WALStart + pglogrepl.LSN(len(xld.WALData))? what is the different between these tow value?

well, it's actually wrong. I had the same question and issues with it. You should commit pglogrepl.PrimaryKeepaliveMessage.ServerWALEnd which is not the end of wal file, but current pointer of logical decoder.

More (my investigation): https://stackoverflow.com/questions/71016200/proper-standby-status-update-in-streaming-replication-protocol

If you don't do so, you will not keep up with Postgres, when lot's of updates would happen to different database on the same instance. Resulting in:

If you need to commit more often that PrimaryKeepaliveMessage, use pglogrepl.CommitMessage.CommitLSN

genezhang commented 1 year ago

Just to add what I did recently, in hope this may help. I don't keep track of transaction boundaries as probably should as the target system is for OLAP and does not care about transaction boundaries. I keep WALStart in each record as PgLSN to track the position the replication is up to, and once a record is written to the target system, its PgLSN+1 (as curLSN below) is used in StandbyStatusUpdate() to update slot's confirmed_flush_lsn on the server. When you resume replication from where you left last time, you should find the Max(PgLSN)+1 from the target system, and use it as curLSN to filter out what you receive from the slot (use condition WALStart < curLSN) as PG seems to search for a transaction boundary when you start_replication and it repeats records for the whole transaction, based confirmed_flush_lsn of the slot, if it's not on a commit. If you don't keep track of the LSNs on the target system (or in a third place), you need to prepare for duplicates when starting replication.

soasada commented 1 year ago

@ubombi do you have a minimal example of using it?

After some investigation I found a way of resuming replication.

The docs make it clear, use confirmed_flush_lsn when using the START_REPLICATION SLOT command. confirmed_flush_lsn is The address (LSN) up to which the logical slot's consumer has confirmed receiving data. Data older than this is not available anymore. NULL for physical slots. (link to docs) So something like this would give you the LSN that the server has confirmed:

SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1

The confirmed LSN is committed using the SendStandbyStatusUpdate function, thus the xld.WALStart + pglogrepl.LSN(len(xld.WALData)) is ok to use.

Furthermore, if we read the postgres code there says that if you specify 0 as LSN it uses the confirmed_flush_lsn by default. In summary, resuming the replication is as easy as using the following function:

err = pglogrepl.StartReplication(context.Background(), conn, slotName, pglogrepl.LSN(0), pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
zachmu commented 6 months ago

@soasada Thanks for the insight, but I want to add that depending on your use case 0/0 is not safe to use in all cases. The issue is that if your system writes an update from the primary and then the process dies before sending the standby status message, postgres will have an incorrect view into your system's state. From my experimentation, it seems that it's also possible for Postgres to receive a standby message but not process it, e.g. if the standby disconnects soon after the standby message is sent.

So for these reasons, I recommend not relying on the 0/0 behavior, but storing the last flushed WAL position locally and always providing it with the START_REPLICATION command.

I wrote a deep dive on this issue and others I encountered while integrating with this library, hopefully it helps somebody:

https://www.dolthub.com/blog/2024-03-08-postgres-logical-replication/