transferwise / pipelinewise-tap-postgres

Singer.io Tap for PostgreSQL - PipelineWise compatible
https://transferwise.github.io/pipelinewise/
GNU Affero General Public License v3.0
41 stars 101 forks source link

Notice

To better serve Wise business and customer needs, the PipelineWise codebase needs to shrink. We have made the difficult decision that, going forward many components of PipelineWise will be removed or incorporated in the main repo. The last version before this decision is v0.64.1

We thank all in the open-source community, that over the past 6 years, have helped to make PipelineWise a robust product for heterogeneous replication of many many Terabytes, daily

pipelinewise-tap-postgres

PyPI version PyPI - Python Version License: MIT

Singer tap that extracts data from a PostgreSQL database and produces JSON-formatted data following the Singer spec.

This is a PipelineWise compatible tap connector.

How to use it

The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap Postgres

If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-postgres

or

  make venv

Create a config.json

{
  "host": "localhost",
  "port": 5432,
  "user": "postgres",
  "password": "secret",
  "dbname": "db"
}

These are the same basic configuration properties used by the PostgreSQL command-line client (psql).

Full list of options in config.json:

Property Type Required? Default Description
host String Yes - PostgreSQL host
port Integer Yes - PostgreSQL port
user String Yes - PostgreSQL user
password String Yes - PostgreSQL password
dbname String Yes - PostgreSQL database name
filter_schemas String No None Comma separated schema names to scan only the required schemas to improve the performance of data extraction.
ssl String No None If set to "true" then use SSL via postgres sslmode require option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail.
logical_poll_total_seconds Integer No 10800 Stop running the tap when no data received from wal after certain number of seconds.
break_at_end_lsn Boolean No true Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started.
max_run_seconds Integer No 43200 Stop running the tap after certain number of seconds.
debug_lsn String No None If set to "true" then add _sdc_lsn property to the singer messages to debug postgres LSN position in the WAL stream.
tap_id String No None ID of the pipeline/tap
itersize Integer No 20000 Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE
default_replication_method String No None Default replication method to use when no one is provided in the catalog (Values: LOG_BASED, INCREMENTAL or FULL_TABLE)
use_secondary Boolean No False Use a database replica for INCREMENTAL and FULL_TABLE replication
secondary_host String No - PostgreSQL Replica host (required if use_secondary is True)
secondary_port Integer No - PostgreSQL Replica port (required if use_secondary is True)
limit Integer No None Adds a limit to INCREMENTAL queries to limit the number of records returns per run

Run the tap in Discovery Mode

tap-postgres --config config.json --discover                # Should dump a Catalog to stdout
tap-postgres --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
          "replication-method": "LOG_BASED",
        }
      }]
    }
  ]
}

The replication method can be one of FULL_TABLE, INCREMENTAL or LOG_BASED.

Note: Log based replication requires a few adjustments in the source postgres database, please read further for more information.

Run the tap in Sync Mode

tap-postgres --config config.json --catalog catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

Log Based replication requirements

To run tests:

  1. Install python test dependencies in a virtual env:

    make venv
  2. You need to have a postgres database to run the tests and export its credentials.

You can make use of the local docker-compose to spin up a test database by running make start_db

Test objects will be created in the postgres database.

  1. To run the unit tests:

    make unit_test
  2. To run the integration tests:

    make integration_test

To run pylint:

Install python dependencies and run python linter

  make venv
  make pylint