pg-logical-replication
1. Install
$ npm install pg-logical-replication
2. Usage
- This is an example using
wal2json
. A replication slot(test_slot_wal2json
) must be created on the PostgreSQL server.
SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
{
database: 'playground',
// ...
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
{
acknowledge: {
auto: true,
timeoutSeconds: 10
}
}
)
// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonPlugin({
/**
* Plugin options for wal2json
* https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
*/
//...
});
/**
* Wal2Json.Output
* https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
*/
service.on('data', (lsn: string, log: Wal2Json.Output) => {
// Do something what you want.
// log.change.filter((change) => change.kind === 'insert').length;
});
// Start subscribing to data change events.
(function proc() {
service.subscribe(plugin, slotName)
.catch((e) => {
console.error(e);
})
.then(() => {
setTimeout(proc, 100);
});
})();
3. LogicalReplicationService
3-1. Constructor(clientConfig: ClientConfig, config?: Partial<LogicalReplicationConfig>)
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
clientConfig: {
user?: string | undefined;
database?: string | undefined;
password?: string | (() => string | Promise<string>) | undefined;
port?: number | undefined;
host?: string | undefined;
connectionString?: string | undefined;
keepAlive?: boolean | undefined;
stream?: stream.Duplex | undefined;
statement_timeout?: false | number | undefined;
parseInputDatesAsUTC?: boolean | undefined;
ssl?: boolean | ConnectionOptions | undefined;
query_timeout?: number | undefined;
keepAliveInitialDelayMillis?: number | undefined;
idle_in_transaction_session_timeout?: number | undefined;
application_name?: string | undefined;
connectionTimeoutMillis?: number | undefined;
types?: CustomTypesConfig | undefined;
options?: string | undefined;
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
config?: Partial<{
acknowledge?: {
/**
* If the value is false, acknowledge must be done manually.
* Default: true
*/
auto: boolean;
/**
* Acknowledge is performed every set time (sec). If 0, do not do it.
* Default: 10
*/
timeoutSeconds: 0 | 10 | number;
};
}>
)
3-2. subscribe(plugin: AbstractPlugin, slotName: string, uptoLsn?: string): Promise<this>
3-3. acknowledge(lsn: string): Promise<boolean>
- After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
- Usually this is done automatically.
- Manually use only when
new LogicalReplicationService({}, {acknowledge: {auto: false}})
.
3-4. Event
on(event: 'start', listener: () => Promise<void> | void)
- Emitted when start replication.
on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
on(event: 'error', listener: (err: Error) => void)
on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
- Emitted when acknowledging automatically.
on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
- A heartbeat check signal has been received from the server. You may need to run
service.acknowledge()
.
3-5. Misc. method
stop(): Promise<this>
- Terminate the server's connection and stop replication.
isStop(): boolean
- Returns false when replication starts from the server.
lastLsn(): string
4. Output Plugins
4-1. PgoutputPlugin
for pgoutput (Native to PostgreSQL)
- Use the pgoutput plugin to process large-scale transactions.
4-2. Wal2JsonPlugin
for wal2json
4-3. ProtocolBuffersPlugin
for decoderbufs
4-4. TestDecodingPlugin
for test_decoding (Not recommended)
Contributors