airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.46k stars 3.98k forks source link

Postgres Incremental sync skips rows after failure #12821

Closed adam-bloom closed 2 years ago

adam-bloom commented 2 years ago
## Environment - **Airbyte version**: 0.36.6-alpha - **OS Version / Instance**: AWS Linux - **Deployment**: kubernetes - **Source Connector and version**: postgres 0.4.14 - **Destination Connector and version**: Redshift 0.3.32 - **Severity**: Medium - **Step where error happened**: Sync job ## Current Behavior Sync attempt 0 failures. Sync retry 1 succeeds. Sync attempt 0 advanced the incremental cursor. Sync attempt 1 skipped rows for incremental tables and was marked successful. ## Expected Behavior Incremental syncs should not be advanced for failures. If they cannot be recovered automatically, then the sync itself should fail in a way that indicates to the user that the source needs to be reset to recover. ## Logs *If applicable, please upload the logs from the failing operation. For sync jobs, you can download the full logs from the UI by going to the sync attempt page and clicking the download logs button at the top right of the logs display window.* Failed attempt: [logs-2048.txt](https://github.com/airbytehq/airbyte/files/8682302/logs-2048.txt) Successful retry: [logs-2048 (1).txt](https://github.com/airbytehq/airbyte/files/8682303/logs-2048.1.txt) ## Steps to Reproduce 1. Start a sync job that uses postgres logical replication 2. Force a failure (not sure if type/timing matters) 3. Observe that recently added rows are skipped on the retry ## Are you willing to submit a PR? This one is probably out of my depth...
grishick commented 2 years ago

Hey team! Please add your planning poker estimate with ZenHub @subodh1810 @tuliren @edgao

subodh1810 commented 2 years ago

@adam-bloom I am a bit confused with this part

Sync attempt 0 failures.
Sync retry 1 succeeds.
Sync attempt 0 advanced the incremental cursor.
Sync attempt 1 skipped rows for incremental tables and was marked successful.

If the sync attempt 0 failed, the state would not be saved in the database and we would try to sync from the previous offset. In the logs that you have shared for the failed attempt, its mentioned that

2022-05-11 21:01:39 INFO i.a.w.DefaultReplicationWorker(run):250 - Source did not output any state messages
adam-bloom commented 2 years ago

@subodh1810 you're correct - it will attempt to start from the previous offset. However, since it already consumed those messages (and advanced the replication slot on the source), it cannot restart successfully. Seen later in attempt 1: 2022-05-11 21:02:59 source > 2022-05-11 21:02:59 INFO i.d.c.p.c.WalPositionLocator(resumeFromLsn):90 - Received COMMIT LSN 'LSN{0/89FF3698}' larger than than last stored commit LSN 'LSN{0/89DAC910}' The messages between those offsets, consumed during attempt 0, are lost to airbyte. That needs to either be not lost or marked somehow.

fmmoret commented 2 years ago

https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-snapshots:~:text=Record%20the%20successful%20completion%20of%20the%20snapshot%20in%20the%20connector%20offsets.

Based on this documentation for debezium, it sounds like LSNs get moved forward inside the CDC slot / source database when debezium closes?

If that's true, that could explain this behavior right?

I would hope that debezium only updates LSNs on start-up based on the LSN that was handed to it -- would make this scenario impossible. But if it updates LSNs on close, that might be the problem.


Example scenario if LSN updates in the CDC database on close:

Run 1: Source gets saved state saying to crawl from offset of 1, consumes to offset of 10, updates consumed lsn in source to 10, exits. Destination errors while consuming records but after source has closed. No state is saved.

Run 2: Source gets saved state saying to crawl from offset of 1, only sees messages starting at offset of 10 because the rest were collected -- consumes up to offset of 20, updates consumed lsn in source to 20, exits.

Destination succeeds, saves finished offset to 20. Everything looks great but unsaved events between offset 1 to offset 10 never get seen again.


I think airbyte may even be using the AlwaysCommitOffsetPolicy https://sourcegraph.com/github.com/debezium/debezium/-/blob/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java?L27 and flushing offsets to sources on a regular cadence without communication w/ the destination to confirm write.

Use in debezium base: https://sourcegraph.com/github.com/airbytehq/airbyte/-/blob/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java?L71

and with 1s flush interval: https://sourcegraph.com/github.com/airbytehq/airbyte/-/blob/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java?L130


Is there a way to 1) only flush on start-up of the source based on the passed in LSN? or 2) avoid flushing until some other signal propagates back from the destination container to the source container confirming committed writes up to a point?

I could be totally missing some sneaky setting somewhere that makes all of this okay

adam-bloom commented 2 years ago

@fmmoret I trust your analysis of why airbyte is flushing offsets at a regular cadence. It's actually quite easy to see while a sync is in progress, at least on a database with a significant replication slot lag. (We have a few sources that accumulate hundreds of MBs within 30 minutes or so.) You'll see the lag slowly decrease, not drop the zero at the end.

Is there a way to

only flush on start-up of the source based on the passed in LSN?

This would result in holding onto the WAL segments at the source until the next sync. Seems inefficient. Ideally, this should be committed back to the source during the end of the sync.

avoid flushing until some other signal propagates back from the destination container to the source container confirming committed writes up to a point?

The destination does not send any messages to the source container. If fact, the current airbyte architecture doesn't support any inbound messaging to the source container.

So, it's definitely not a simple problem to solve. There are some alternative approaches that put the burden on the destination to be able to retry failures (especially destinations that use some sort of data staging) with the existing source data. That of course is a decision for the airbyte team to make.

fmmoret commented 2 years ago

There are some alternative approaches that put the burden on the destination to be able to retry failures (especially destinations that use some sort of data staging) with the existing source data

That assumes there are no machine failures / OOM events / OS killing processes type things going on. I think the system should be tolerant of unforeseen outages / drops / machine swaps or restarts / etc.

This would result in holding onto the WAL segments at the source until the next sync. Seems inefficient. Ideally, this should be committed back to the source during the end of the sync.

I'd argue that correct or lossless >>> efficient in importance when it comes to data replication. FWIW, I think there are inefficiencies -- but for a different reason. I don't think buffering the WAL on a discontinuous basis is bad & the postgres background worker will actually be doing less work GCing logs on a constant basis, BUT IIRC, debezium does hold transactions for some connectors (not sure if duration is tied to CDC commit) and I could see those getting more expensive in some cases.

The destination does not send any messages to the source container. If fact, the current airbyte architecture doesn't support any inbound messaging to the source container.

Agree that it's probably non-trivial. Flush only on start-up would make it correct and is probably easier to fix quickly, looping state messages back to sources would get the best of both worlds but is probably a pretty gnarly project to get done.

Curious to see what their team thinks

fmmoret commented 2 years ago

@subodh1810 flushing the cdc slot on start-up of the source based on the passed in LSN (instead of every X seconds) is a thing I've seen other OSS connectors do. e.g. IIRC singer / meltano does it

fmmoret commented 2 years ago

@subodh1810 the fix you added looks great for certain edge cases but I'd still be wary of resyncing large dbs just because of a user-initiated job cancel / k8s node reschedule / machine upgrade / temporary network degradation -- which probably happen more often than you'd think & would disproportionally affect/interrupt jobs for large dbs because of longer job durations -> more time to be exposed to random events.

Could flushing only on start up be offered as a flag if your team is against applying it as a general rule?

adam-bloom commented 2 years ago

Agreed with @fmmoret - we have some sources that take 16+ hours to do an initial sync. Those syncs won’t even succeed via the mechanism we use for incremental syncs due to a REST API issue (separate issue open for that), so that’d be all sorts of fun to recover from. The more I noodle in this in the back of my head, the more the flush only on startup option makes sense.

fmmoret commented 2 years ago

CC: @edgao since I saw you in the review.

subodh1810 commented 2 years ago

@fmmoret @adam-bloom Regarding flushing the offset at the beginning, has its own set of problems. Firstly debezium currently doesnt provide any mechanism to configure custom behaviour for flushing the offset. Secondly if we dont flush offsets soon enough, WAL logs would start building up and eventually would start impacting the entire database itself with issues related to disk space and all.

@adam-bloom am not sure I understand the following statement

we have some sources that take 16+ hours to do an initial sync. Those syncs won’t even succeed via the mechanism we use for incremental syncs due to a REST API issue (separate issue open for that), so that’d be all sorts of fun to recover from.

are you talking about postgres source or a different source?

adam-bloom commented 2 years ago

@subodh1810 we have ~20+ postgres sources. A handful of those take 8+ hours for the initial syncs, some up to 16 hours. airbytehq/airbyte-internal-issues#719 affects the REST API for very long running jobs, so we have to handle these initial syncs manually and not via a REST API trigger.

fmmoret commented 2 years ago

@subodh1810 re: " if we dont flush offsets soon enough, WAL logs would start building up and eventually would start impacting the entire database itself"

I think that is already happening on initial syncs.

And I could have sworn debezium will let you implement a class to control commit behavior

subodh1810 commented 2 years ago

@fmmoret I see your point. While I still feel that there are scenarios where flushing LSN after the load in the destination can cause trouble (imagine a database with huge WAL logs, it will only be cleared once the data is loaded in the destination and if the destination takes a lot of time, and assuming it would succeed, the WAL logs won't be cleared until its finished and a new sync starts. This wont happen in the current implementation of continuously committing LSN). I have created an issue to implement a parallel behaviour for users to choose which LSN commit behaviour they want.

subodh1810 commented 2 years ago

@fmmoret also if you are aware of a relevant debezium doc which contains info on how to build custom LSN commit behaviour in postgres, can you please share. I was looking at the doc and didnt find anything obvious in there

fmmoret commented 2 years ago

@subodh1810 https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=offset.commit.policy

Yep -- it's a debezium engine setting. https://github.com/airbytehq/airbyte/blob/49ee78b7fa6723a98bb320d3c49716e8871d0bf4/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java#L72

https://sourcegraph.com/github.com/debezium/debezium/-/blob/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

FWIW, I don't know if there are any possible race conditions. I don't see a way to make sure it'd flush at start time. It looks like the policy is only checked on debezium exit + after debezium "batches".


It might be more ergonomic to just open a postgres connection before starting debezium and manually confirming the lsn.

PG clients can probably handle this natively / may allow you to just start replication, confirm stashed lsn, exit, start Debezium.

In python with the psycopg2 driver, it's as simple as cursor.start_replication(...), cursor.send_feedback(lsn...). Java driver might be similarly simple.

It looks like probably so: https://jdbc.postgresql.org/documentation/head/replication.html

    PGReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            ...
            .start();

    //feedback
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());

Separately, you might be able to accomplish the manual commit via constructing a debezium offset storage writer: https://github.com/debezium/debezium/blob/90a4c0059a5d0a81dd415290be6d6fb5b184f08c/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java#L725

and flushing with it offsetWriter.offset(...) offsetWriter. [etc]

subodh1810 commented 2 years ago

Thanks @fmmoret we will look into how we can flush the LSN

subodh1810 commented 2 years ago

@fmmoret I was going through the OffsetCommitPolicy in debezium and it determines when should debezium write the offset (it's own state) in the state file. It doesn't configure the behaviour on when debezium would commit the lsn on source database. The policy that defines when the offsets should be committed to offset storage.

fmmoret commented 2 years ago

@subodh1810 Then it might mean one of the other two strategies makes sense.

Somewhat tangentially, the manual flushing would play well with another improvement I mentioned above:

https://github.com/airbytehq/airbyte/issues/12821#issuecomment-1183690934

avoid flushing until some other signal propagates back from the destination container to the source container confirming committed writes up to a point?

The idea of looping destination "state" messages back to the source would tighten up your other concern around "it will only be cleared once the data is loaded in the destination and if the destination takes a lot of time, and assuming it would succeed, the WAL logs won't be cleared until its finished and a new sync starts".

Obviously large task / big steps, but it might be a worthwhile infra/protocol investment if the pattern shows up in other connectors (are CDC connectors the only "stateful" ones?) or if CDC becomes high enough value (or too high maintenance from fragility) for your team.

subodh1810 commented 1 year ago

@fmmoret we just released a new version of Postgres source connector which offers the LSN commit behaviour change as an option. You can choose when the LSN should be acknowledged in the source while setting up the connector