estuary / connectors

Connectors for capturing data from external data sources
Other
43 stars 11 forks source link

PostgreSQL Source Connector #22

Closed willdonnelly closed 2 years ago

willdonnelly commented 3 years ago

We need a connector which can continuously source changes from PostgreSQL via Logical Replication.

There's an Airbyte PostgreSQL connector, and it even supports replication, but it operates in a polling mode where the connector gets re-run periodically to collect new changes and then shuts down, and making it run continuously appears nontrivial.

But since that connector exists, that means that this one can be focused solely on the low-latency replication side of things, which simplifies the problem a bit.

willdonnelly commented 2 years ago

I've been delinquent in updating this issue with development progress, but development has been taking place on the wgd/postgres-source-connector branch. I went back over my notes and will backfill some status updates and major decisions here.

Monday, 23 August 2021

I took an initial stab at writing a PostgreSQL Logical Replication consumer in Go using github.com/jackc/pgx and github.com/jackc/pglogrepl. It doesn't currently implement any of the table-scanning logic, just starts replication and translates change events into Airbyte-spec-compatible JSON messages.

Tuesday, 24 August 2021

Discussed state messages and commits and acknowledgement with Johnny. Takeaways are that we should make sure to only emit state messages at consistent points in our output, which in this case means whenever there are no open transactions in the PostgreSQL replication event stream.

The other thing we discussed was how to handle "acknowledgement" from the connector back to PostgreSQL. See there's a "replication slot" which is basically a named cursor into the WAL (the restart_lsn of the slot), and it gets advanced based on the WALFlushPosition field of our StandbyStatusUpdate messages to the DB. But once we've advanced that cursor we can't rewind it back, so we need to only advance our WALFlushPosition once we're confident that we'll never be restarted at an earlier state.

Unfortunately there is no reliable heuristics about how far back "in the past" the connector might get restarted, and no mechanism in the Airbyte connector protocol which would explicitly tell us that. The only thing we can know for certain is that after a connector restart, the state we restarted at has been committed and we're not going to need to rewind back past that state.

Thus if we only ever set WALFlushPosition to the CurrentLSN provided as part of our restart state, then we'll always be correct but at the cost of potentially holding onto more WAL data than would otherwise be necessary. This assumes that the connector will get killed and restarted "every so often" however.

Thursday, 26 August 2021

I've got the whole "scan tables then begin replication" process working all right, but there's an obvious problem which will occur when adding a new table to the catalog which is already so large that the scan takes hours or days. If we can't be restarted partway through the table scan without repeating the entire thing, then this could lead to an unbounded period of "stuckness" in which the connector starts scanning the table, gets killed and restarted, and repeats.

After discussing this in the morning standup, the consensus was that we'll need chunked table scans, but that we probably don't need to implement interleaving between replication streaming and table-scanning. If a user wants to add a new table, and it's so large that the scan will take hours, and they can't tolerate that much of an interruption to existing streams, then one or the other stream should probably be a separate connector instance anyway, just for isolation.

Friday, 27 August 2021

I spent some time reading through the PostgreSQL ReorderBuffer and pgoutput code to properly understand the semantics of the messages we receive, and what invariants we can rely on. Some takeaways:

I'm still working on chunked table scans, I tried to do something clever using ctid or xmin system columns, but at this point I'm convinced that I'll have to just use the primary key of the table for scanning, because nothing else allows replication change events to be reliably mapped back to the appropriate 'scan range' of the table.

willdonnelly commented 2 years ago

Wednesday, 1 September 2021

One issue with chunked table scans using the primary key of an arbitrary table is that we need to be able to generically handle multiple-column primary keys with various data types for each column, and the comparisons thereof. It was recommended to me that the FoundationDB tuple encoding (which is used in other bits of Flow code already) might help solve this problem.

The FoundationDB tuple encoding gives us a way of serializing ordered tuples of heterogenous data to byte strings, and later deserializing them without having to separately keep track of the expected type of each field. Furthermore, the lexicographical ordering of these encoded bytes should match the ordering of the equivalent tuples. I'll need to validate that the ordering property actually matches between FoundationDB serialization and PostgreSQL ORDER BY clauses, but so long as that's the case this is a perfect solution.

In other topics, somebody asked in today's standup how exactly resuming replication from an arbitrary LSN works, and whether we can rely on that. The short answer is that PostgreSQL's ReorderBuffer logic takes care of most of the complexity for us, and actually starts decoding the WAL from the restart_lsn of the replication slot (which due to how it gets updated is known to be "safe") and just holds off on sending us the decoded events until the requested LSN is reached.

Tuesday, 7 September 2021

I've added an opportunistic check during the table scanning process which simply encodes each row's primary key as a FoundationDB serialized tuple, and verifies that these tuples appear in ascending order. This check failing would be bad, as it would indicate that the scan range comparisons may not be correct, which would result in replication change events being filtered when they shouldn't (or vice versa).

Thursday, 9 September 2021

Yesterday I started making the tests/run.sh integration test work with the Postgres connector, but had some trouble with how primary keys are specified and/or figured out. After discussing this in the morning standup I've changed how this works in two ways:

  1. By default, the "primary key" used for table-scanning and event filtering will be obtained by simply querying the database for its primary key.
  2. Ignoring what the Airbyte spec says about its usage, if the primary_key property is specified on a stream in the ConfiguredCatalog we'll use that instead of the DB's native primary key.

For context, the "primary key" is only used for chunking the initial table scan, and then later for mapping replicated change events to specific table chunks for LSN-based filtering. There is no technical reason that this has to be the same as the primary key of the underlying table, although the key used for scanning must be reasonably unique and scanning might be less efficient if there's no suitable index. So defaulting to the native primary key of the table while allowing an override seems like the best option.

Friday, 10 September 2021

I refactored the code a little bit and implement some go test-able test cases. The basic pattern here is that I'm running the connector in "polling" mode (which means merely that it shuts itself down after no replication events have arrived for a while), with output to a special testing-only 'Encoder' which buffers the messages in memory rather than JSON-encoding and writing them to stdout.

After the capture terminates of its own accord, the sequence of records and state updates is sanitized of their LSNs and timestamps (so that output will be identical between runs of "the same" behavior) and the result is compared to a golden snapshot.

There's some additional plumbing which keeps track of the sequence of states so we can test resuming partway through a capture, and a bunch of test helper functions so that complex scenarios like "create a test table, insert some rows, run the capture and verify the results against the snapshot, make a few more changes, then run a second capture and verify that snapshot as well" can be expressed concisely.

willdonnelly commented 2 years ago

Tuesday, 14 September 2021

I've finished implementing all of the remaining test cases on my list. Code coverage stats indicate that most connector logic is hit by at least one test in the suite, with the exception of a bunch of error reporting paths that probably aren't worth the effort of testing since they're basically all if err != nil { return errors.Wrap(err, "failed to do thing") }.

One test in particular consumed most of my day, that being TestSlotLSNAdvances. This is kind of an important test, since it verifies that our approach to updating the restart_lsn of the replication slot actually works, which is a necessity for any sort of long-running capture (if the replication slot fails to advance it will cause the DB to retain old WAL records indefinitely and also take longer and longer to decode WAL contents each time the connector restarts).

Wednesday 15 September 2021

After I spent yesterday digging into exactly when and how the restart_lsn gets updated, I'm feeling pretty leery of the whole "wait until a connector restart to update it" approach. We already knew this would result in a bunch of extra WAL contents being held onto for hours or days until the connector restarts, and that's already suboptimal, but now I understand that START_REPLICATION doesn't actually seek to the requested LSN, it just starts decoding the WAL from restart_lsn and starts sending output once the requested point is reached. This could theoretically lead to some awful "hiccups" in replication latency after a connector restart.

Furthermore, PostgreSQL isn't the only time this is going to be a problem. We're likely to run into the same sort of concerns any time we want to integrate with a system that wants positive acknowledgement that some data has been processed, for instance I think somebody in the standup mentioned RabbitMQ as a case where we'll definitely need to acknowledge message receipt.

I'm going to finish work on the connector as-is for now, since the current "set commit LSN after restart" behavior is still the correct thing to do upon restart and acknowledgement support would mostly be some added logic regardless of when I write it.

Later I'll sketch out a design for adding acknowledgement feedback to the connector protocol and come back to add it to this connector.

willdonnelly commented 2 years ago

I finally went back and fleshed out the data type support. This includes both making discovery work (translating PostgreSQL table column types into JSON schema describing the connector's output stream) and adding a tiny bit of validation that we can insert values of each type into a table and get scanning/replication records back with the expected contents.

The set of data types is nearly complete now, but there's ample room for bikeshedding in a lot of places here. For instance:

There's probably others too, those are just the ones that jumped out at me.

willdonnelly commented 2 years ago

In today's standup it was mentioned that customers with huge (>4TiB) preexisting datasets are probably a real concern, and that we probably will need to figure out how to interleave the scanning of preexisting data with replication event processing.

One major chunk of work in that direction is already done, with the table-scanning process divided up into chunks so that it can be resumed across restarts. In a very handwavy sense, all we need to do is run replication and the chunked table-scanning concurrently, with some tweaks to the event filtering logic and maybe pausing replication briefly while each chunk is scanned.

Some issues that occur to me with this approach:

  1. We will only start getting replication events for some range of a table after the initial scan reaches that point. Contrast this with Netflix's DBLog, which solves much the same problem in the opposite way -- replication events are always emitted, and table scan results get filtered to make sure they don't "overwrite" newer changes. All else being equal it seems preferable to prioritize the most recent changes (that is, filter the table scan events rather than the replicated ones), but the tradeoff here is that we could see DELETE and UPDATE changes for rows that haven't yet been INSERTed in the connector output.
  2. If each chunk of the table is scanned at a different point in time, then the size of the state JSON could get absurdly large. We would definitely need some way to "discard" or "merge" the tracking of older chunks once we're sufficiently far past the point at which they were scanned, so once the steady-state is reached the state is just a pair of CurrentLSN plus a property that tells us that the entire table has been scanned a while ago and not to do any filtering.

I'm tempted to just say that DBLog has the right idea and just change over to doing that, accepting the potential for a DELETE or UPDATE to be the first time we learn about a row. However I need to think some more about how the low/high watermark concept translates, specifically whether we can get the same effect using LSNs if we don't mind being PostgreSQL specific.

willdonnelly commented 2 years ago

Note to self: Moving things to 'Done' on the big board doesn't actually close the issue.