airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.04k stars 4.11k forks source link

[source-postgres] Change Data Capture (CDC) Not Working #47547

Open Nanagoud007 opened 4 days ago

Nanagoud007 commented 4 days ago

Topic

I have enabled Change Data Capture (CDC) on my PostgreSQL source following the instructions provided in the Airbyte PostgreSQL source documentation. However, during each sync, Airbyte is loading the entire dataset instead of only the changes. This issue persists even when there are no updates in the source database.

Relevant information

Steps Taken:

Logs:This 4th time ran sync logdefault_workspace_logs_1388_txt.txt

Expected Behavior: Airbyte should only sync the changed records instead of reloading the entire dataset on every sync.

Actual Behavior: Each sync loads the whole dataset, resulting in increased load times and performance issues.

Additional Information: PostgreSQL Version: 16 Airbyte Version: 0.58.0 >

Questions:

  1. What could be the underlying cause of the incremental changes not being detected?
  2. Are there specific logs or configurations I should examine to troubleshoot this issue?

I would appreciate any assistance in diagnosing and resolving this problem.

jduckwo commented 4 days ago

I am running into a similar issue. It appears that the subscriber to the replication slot is not being registered properly. When I check the active status of the replication slot, it is set to active=false and the restart_lsn is not advancing as expected. I have dropped/recreated the PG publication, PG Replication Slot, AB Source, AB Destination, and AB Connection objects and still cannot get the incremental replication to work properly. I also verified in the Airbyte logs that the query against the pg_replication_slots view does find the replication slot.

Here is my current config:

Postgres config:

ALTER SYSTEM SET wal_keep_size = '1GB';
SELECT pg_reload_conf();
SHOW wal_keep_size;

CREATE ROLE airbyte_role;
ALTER ROLE airbyte_role REPLICATION LOGIN;

GRANT SELECT ON TABLE public.table_1 TO airbyte_role;
ALTER TABLE public.table_1 REPLICA IDENTITY DEFAULT;

CREATE PUBLICATION airbyte_publication FOR TABLES IN SCHEMA public;
SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');

I also would appreciate any help getting this resolved. If helpful, I can provide my logs in addition to the ones in the original post.

Nanagoud007 commented 4 days ago

@jduckwo active status will be true when the sync is running

jduckwo commented 4 days ago

@Nanagoud007 I queried the status repeatedly while the sync was running and did not see it ever return as active. Also, the restart_lsn is not advancing as expected. Here are the logs from my latest run:

default_workspace_logs_84_txt.txt

Nanagoud007 commented 4 days ago

@jduckwo in my log this is causing the full refresh for me 2024-10-28 10:04:37 platform > SOURCE analytics [airbyte/source-postgres:3.6.22] | Type: db-sources-cdc-cursor-invalid | Value: 1

Nanagoud007 commented 4 days ago

@jduckwo i have analysed your attached log the sync mode is full refresh in you case ime","airbyte_type":"timestamp_with_timezone"}}},supportedSyncModes=[full_refresh],sourceDefined =true,defaultCursorField=[_ab_cdc_lsn],sourceDefinedPrimaryKey=[[id]],namespace=reporting,isResumable=true,additionalProperties={}],syncMode=full_refresh,cursorField=[_ab_cdc_lsn],destinationSyncMode=overwrite,primaryKey=[[id]],generationId=55,minimumGenerationId=55,syncId=84,additionalProperties={}] will sync in resumeable full refresh mode.

jduckwo commented 4 days ago

There are multiple tables and views in that specific log. Some of those are set to have a full sync while others are not. table_1 should be set to incremental. I am currently upgrading Airbyte to the latest version and recreating all my resources. Once that is done, I will upload a cleaner log with the one table.

Nanagoud007 commented 4 days ago

@jduckwo sure, first you try to give publisher to one table let me know if you make any progress, i tried other method xmin column it re-syncing very past only down site is no deletions are captured

jduckwo commented 4 days ago

I rebuilt my Airbyte installation, source, destination, and connection. I dropped and recreated my PG publication and replication slot. I granted access to 1 table table_1 to the database role I am using for the connection.

table_1

results

I attached the logs from my latest run. They should be cleaner then the earlier log I posted. Image default_workspace_logs_12_txt.txt

marcosmarxm commented 3 days ago

Hello @jduckwo and @Nanagoud007 I shared this issue with the connector team to take a look.

@Nanagoud007 it looks you're using an old version of Airbyte. Is it possible to upgrade to latest? Also can you share what is the source connector version you're using?

rodireich commented 3 days ago

The above log provided by @Nanagoud007 is showing that the last saved offset is no longer on the database server and as a result we are falling back to rebuilding the tables from scratch :

2024-10-28 10:04:37 source > WARN main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):188 Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch

Postgres has a retention period for CDC logs. In order for CDC based incremental sync to function we need Airbyte's sync timeout to be shorter than the retention period so postgres won't recycle the CDC logs we need before airbyte has a chance to read them. Is this happening on every attempt to sync?

Nanagoud007 commented 3 days ago

@marcosmarxm These re the versions of air byte services Image and i am using air byte 1.0.0

Airbyte service containersImage

Nanagoud007 commented 3 days ago

@rodireich Thank you for clarifying! To ensure that the sync timeout is shorter than PostgreSQL’s CDC log retention period, could you advise on where exactly I should look to check and configure these settings? Specifically, if there are any files or parameters within Airbyte or PostgreSQL that I need to adjust, I'd appreciate some direction. Also, is there a recommended timeout setting for the sync to avoid this issue?

and when i change the Invalid CDC position behavior (Advanced) to fail sync it is giving me error and if make it re-sync it will refresh whole Failure in source: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.

jduckwo commented 3 days ago

@rodireich

For my setup (I'm on the latest version of AB and the PG connectors), I change the InvalidCdcCursorPositionBehaviour to Fail Sync for testing to see if I could flush out any more details. It looks like the first sync is successful, then every sync after fails (even if triggered immediately after the first sync) with this error:

Failure in source: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.

rodireich commented 3 days ago

when i change the Invalid CDC position behavior (Advanced) to fail sync it is giving me error and if make it re-sync it will refresh whole Failure in source

@Nanagoud007 this is the expected behavior. Rather than falling automatically to a full rebuild of the table it will stop with an error asking the admin to do so. This is intended for making the problem visible rather than resending by itself.

jduckwo commented 3 days ago

@rodireich I think it is understood that this is the expected behavior when this error occurs if the InvalidCdcCursorPositionBehaviour is set to Fail Sync. The question is how do we correct this issue (what settings do we need to change) on the Postgres server so that the incremental loads run as expected. Is this in fact an issue with the Postgres config, or is this a problem with the AB connector itself.

this is the expected behavior. Rather than falling automatically to a full rebuild of the table it will stop with an error asking the admin to do so. This is intended for making the problem visible rather than resending by itself.

rodireich commented 2 days ago

@jduckwo the first sync is actually done by going to the table and selecting from it. not some CDC operation. Think of it essentially as a SELECT * from table. The second sync is the one where we attempt to find our place in the CDC log (or WAL in postgres) and in your case if it fails immediately after the first sync it indicates that something is misconfigured. On the log you provided I don't see the same problem with an offset no longer on server. But maybe that log doesn't cover the second sync.

rodireich commented 2 days ago

@Nanagoud007 how often is airbyte set to sync this connection?

jduckwo commented 2 days ago

@rodireich I just recreated a new connection and the sync ran twice. One to initially populate the tables, then the second as an incremental sync (no actual data changes for the second). The sync is scheduled to run every 1 hour. The first completed successfully, the second failed with the aforementioned error. Here are the logs from both runs.

Initial Sync: default_workspace_logs_47_txt.txt Incremental Sync: default_workspace_logs_48_txt.txt

Nanagoud007 commented 2 days ago

@Nanagoud007 how often is airbyte set to sync this connection?

i have set sync for every hour, i have tried minute sync also its not happening