xataio / pgstream

PostgreSQL replication with DDL changes
http://www.xata.io
Apache License 2.0
349 stars 11 forks source link
ddl elasticsearch golang hacktoberfest opensearch postgresql replication schema webhooks
pgstream logo

License - Apache 2.0  CI Build   Go Reference  Discord   X (formerly Twitter) Follow

pgstream - Postgres replication with DDL changes

pgstream is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided output.

Features

Table of Contents

Usage

pgstream can be used via the readily available CLI or as a library.

CLI

Installation

Binaries

Binaries are available for Linux, macOS & Windows, check our Releases.

From source

To install pgstream from the source, run the following command:

go install github.com/xataio/pgstream@latest
From package manager - Homebrew

To install pgstream with homebrew, run the following command:

# macOS or Linux
brew tap xataio/pgstream
brew install pgstream

Environment setup

If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation).

docker-compose -f build/docker/docker-compose.yml up

Prepare the database

This will create the pgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service.

pgstream init --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"

If there are any issues or if you want to clean up the pgstream setup, you can run the following.

pgstream tear-down --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"

This command will clean up all pgstream state.

Run pgstream

Run will require the configuration to be provided, either via environment variables, config file or a combination of both. There are some sample configuration files provided in the repo that can be used as guidelines.

Example running pgstream with Postgres -> OpenSearch:

pgstream run -c pg2os.env --log-level trace

Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch:

pgstream run -c pg2kafka.env --log-level trace
pgstream run -c kafka2os.env --log-level trace

The run command will parse the configuration provided, and initialise the configured modules. It requires at least one listener and one processor.

Configuration

Here's a list of all the environment variables that can be used to configure the individual modules, along with their descriptions and default values.

Listeners

Postgres Listener | Environment Variable | Default | Required | Description | | ------------------------------ | ------- | -------- | -------------------------------------------------------------------- | | PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
Kafka Listener | Environment Variable | Default | Required | Description | | -------------------------------------------------- | -------- | ---------------- | ------------------------------------------------------------------------------------------------------ | | PGSTREAM_KAFKA_SERVERS | N/A | Yes | URLs for the Kafka servers to connect to. | | PGSTREAM_KAFKA_TOPIC_NAME | N/A | Yes | Name of the Kafka topic to read from. | | PGSTREAM_KAFKA_READER_CONSUMER_GROUP_ID | N/A | Yes | Name of the Kafka consumer group for the WAL Kafka reader. | | PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET | Earliest | No | Kafka offset from which the consumer will start if there's no offset available for the consumer group. | | PGSTREAM_KAFKA_TLS_ENABLED | False | No | Enable TLS connection to the Kafka servers. | | PGSTREAM_KAFKA_TLS_CA_CERT_FILE | "" | When TLS enabled | Path to the CA PEM certificate to use for Kafka TLS authentication. | | PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE | "" | No | Path to the client PEM certificate to use for Kafka TLS client authentication. | | PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE | "" | No | Path to the client PEM private key to use for Kafka TLS client authentication. | | PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the Kafka commit retries. | | PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the Kafka commit retries. | | PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the Kafka commit retries. | | PGSTREAM_KAFKA_COMMIT_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the Kafka commit retries. | | PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the Kafka commit retries. | One of exponential/constant backoff policies can be provided for the Kafka committing retry strategy. If none is provided, no retries apply.

Processors

Kafka Batch Writer | Environment Variable | Default | Required | Description | | --------------------------------------- | ------- | ---------------- | --------------------------------------------------------------------------------------------------- | | PGSTREAM_KAFKA_SERVERS | N/A | Yes | URLs for the Kafka servers to connect to. | | PGSTREAM_KAFKA_TOPIC_NAME | N/A | Yes | Name of the Kafka topic to write to. | | PGSTREAM_KAFKA_TOPIC_PARTITIONS | 1 | No | Number of partitions created for the Kafka topic if auto create is enabled. | | PGSTREAM_KAFKA_TOPIC_REPLICATION_FACTOR | 1 | No | Replication factor used when creating the Kafka topic if auto create is enabled. | | PGSTREAM_KAFKA_TOPIC_AUTO_CREATE | False | No | Auto creation of configured Kafka topic if it doesn't exist. | | PGSTREAM_KAFKA_TLS_ENABLED | False | No | Enable TLS connection to the Kafka servers. | | PGSTREAM_KAFKA_TLS_CA_CERT_FILE | "" | When TLS enabled | Path to the CA PEM certificate to use for Kafka TLS authentication. | | PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE | "" | No | Path to the client PEM certificate to use for Kafka TLS client authentication. | | PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE | "" | No | Path to the client PEM private key to use for Kafka TLS client authentication. | | PGSTREAM_KAFKA_WRITER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to Kafka is triggered. | | PGSTREAM_KAFKA_WRITER_BATCH_BYTES | 1572864 | No | Max size in bytes for a given batch. When this size is reached, the batch is sent to Kafka. | | PGSTREAM_KAFKA_WRITER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to Kafka. | | PGSTREAM_KAFKA_WRITER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the Kafka batch writer for inflight batches. |
Search Batch Indexer | Environment Variable | Default | Required | Description | | -------------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | --- | | PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). | | PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). | | PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. | | PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. | | PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. | One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply. One of exponential/constant backoff policies can be provided for the search store retry strategy. If none is provided, a default exponential backoff policy applies.
Webhook Notifier | Environment Variable | Default | Required | Description | | ---------------------------------------------------------- | ------- | ------------------ | ----------------------------------------------------------------------------------------------------------- | | PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL | N/A | Yes | URL for the webhook subscription store to connect to. | | PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_ENABLED | False | No | Caching applied to the subscription store retrieval queries. | | PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_REFRESH_INTERVAL | 60s | When cache enabled | Interval at which the subscription store cache will be refreshed. Indicates max cache staleness. | | PGSTREAM_WEBHOOK_NOTIFIER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the webhook notifier for inflight notifications. | | PGSTREAM_WEBHOOK_NOTIFIER_WORKER_COUNT | 10 | No | Max number of concurrent workers that will send webhook notifications for a given WAL event. | | PGSTREAM_WEBHOOK_NOTIFIER_CLIENT_TIMEOUT | 10s | No | Max time the notifier will wait for a response from a webhook URL before timing out. | | PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_ADDRESS | ":9900" | No | Address for the subscription server to listen on. | | PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_READ_TIMEOUT | 5s | No | Max duration for reading an entire server request, including the body before timing out. | | PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_WRITE_TIMEOUT | 10s | No | Max duration before timing out writes of the response. It is reset whenever a new request's header is read. |
Translator | Environment Variable | Default | Required | Description | | -------------------------------------- | ------- | -------- | -------------------------------------------------------------- | | PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL | N/A | Yes | URL for the postgres URL where the schema log table is stored. |

Tracking schema changes

One of the main differentiators of pgstream is the fact that it tracks and replicates schema changes automatically. It relies on SQL triggers that will populate a Postgres table (pgstream.schema_log) containing a history log of all DDL changes for a given schema. Whenever a schema change occurs, this trigger creates a new row in the schema log table with the schema encoded as a JSON value. This table tracks all the schema changes, forming a linearised change log that is then parsed and used within the pgstream pipeline to identify modifications and push the relevant changes downstream.

The detailed SQL used can be found in the migrations folder.

The schema and data changes are part of the same linear stream - the downstream consumers always observe the schema changes as soon as they happen, before any data arrives that relies on the new schema. This prevents data loss and manual intervention.

Architecture

pgstream is constructed as a streaming pipeline, where data from one module streams into the next, eventually reaching the configured output plugins. It keeps track of schema changes and replicates them along with the data changes to ensure a consistent view of the source data downstream. This modular approach makes adding and integrating output plugin implementations simple and painless.

pgstream architecture v1

At a high level the implementation is split into WAL listeners and WAL processors.

WAL Listener

A listener is anything that listens for WAL data, regardless of the source. It has a single responsibility: consume and manage the WAL events, delegating the processing of those entries to modules that form the processing pipeline. Depending on the listener implementation, it might be required to also have a checkpointer to flag the events as processed once the processor is done.

There are currently two implementations of the listener:

WAL Processor

A processor processes a WAL event. Depending on the implementation it might also be required to checkpoint the event once it's done processing it as described above.

There are currently two implementations of the processor:

In addition to the implementations described above, there's an optional processor decorator, the translator, that injects some of the pgstream logic into the WAL event. This includes:

Limitations

Some of the limitations of the initial release include:

Glossary

Contributing

We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines:

Contributing Code

  1. Fork the repository.
  2. Create a new branch for your feature or bug fix.
  3. Make your changes and write tests if applicable.
  4. Ensure your code passes linting and tests.
    • There's a pre-commit configuration available on the root directory (.pre-commit-config.yaml), which can be used to validate the CI checks locally.
  5. Submit a pull request.

For this project, we pledge to act and interact in ways that contribute to an open, welcoming, diverse, inclusive, and healthy community.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Support

If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help.


Made with :heart: by Xata 🦋