airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.78k stars 3.8k forks source link

Source MSSQL - Sync CDC with New Changes Only option not working. #18860

Open quannh-uet opened 1 year ago

quannh-uet commented 1 year ago
## Environment - **Airbyte version**: 0.35.65-alpha - **OS Version / Instance**: AWS EC2 - **Deployment**: Docker - **Source Connector and version**: MSSQL 0.4.24 - **Destination Connector and version**: Snowflake 0.4.38 - **Step where error happened**: Setup new connection and sync data ## Current Behavior I setup new source connector as CDC mode with `New Changes Only` option After that, when I sync data, job read a table as `Existing and New` options. ## Expected Behavior Sync job need run as `New Changes Only` option ## Logs Source connector config: ``` { "host": "xx.xx.xx.xx", "port": 1433, "schemas": [ "dbo" ], "database": "xxx", "password": "xxx", "username": "xxx", "ssl_method": { "ssl_method": "unencrypted" }, "tunnel_method": { "tunnel_method": "NO_TUNNEL" }, "replication_method": { "method": "CDC", "data_to_sync": "New Changes Only", "snapshot_isolation": "Snapshot" } } ``` Debezium config on log: `snapshot.mode = initial`. ``` 2022-11-02 12:14:13 source > Starting SqlServerConnectorTask with configuration: 2022-11-02 12:14:13 source > connector.class = io.debezium.connector.sqlserver.SqlServerConnector 2022-11-02 12:14:13 source > max.queue.size = 8192 2022-11-02 12:14:13 source > provide.transaction.metadata = false 2022-11-02 12:14:13 source > include.schema.changes = false 2022-11-02 12:14:13 source > offset.storage.file.filename = /tmp/cdc-state-offset757126953703380547/offset.dat 2022-11-02 12:14:13 source > decimal.handling.mode = string 2022-11-02 12:14:13 source > converters = mssql_converter 2022-11-02 12:14:13 source > database.history.file.filename = /tmp/cdc-db-history6392748082955249532/dbhistory.dat 2022-11-02 12:14:13 source > database.user = xxx 2022-11-02 12:14:13 source > database.dbname = xxx 2022-11-02 12:14:13 source > offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore 2022-11-02 12:14:13 source > mssql_converter.type = io.airbyte.integrations.debezium.internals.MSSQLConverter 2022-11-02 12:14:13 source > database.server.name = xxx 2022-11-02 12:14:13 source > snapshot.isolation.mode = snapshot 2022-11-02 12:14:13 source > database.port = 1433 2022-11-02 12:14:13 source > offset.flush.interval.ms = 1000 2022-11-02 12:14:13 source > key.converter.schemas.enable = false 2022-11-02 12:14:13 source > database.hostname = 172.38.17.93 2022-11-02 12:14:13 source > database.password = ******** 2022-11-02 12:14:13 source > name = xx 2022-11-02 12:14:13 source > value.converter.schemas.enable = false 2022-11-02 12:14:13 source > max.batch.size = 2048 2022-11-02 12:14:13 source > table.include.list = dbo.xxx 2022-11-02 12:14:13 source > snapshot.mode = initial 2022-11-02 12:14:13 source > database.history = io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory 2022-11-02 12:14:13 source > database.include.list = xxx ``` ## Steps to Reproduce 1. Setup new source connector with CDC mode and `New Changes Only` option 2. Run sync job 3. View log ## Are you willing to submit a PR?

Yes, I'll create a PR fixing that.

natalyjazzviolin commented 1 year ago

Hi @quannh-uet ! Could you please doublecheck the Airbyte version you listed, and correct it if needed?

quannh-uet commented 1 year ago

Hi @natalyjazzviolin, Airbyte, Source connector and Destination connector version is correct. I'll create a PR fix this bug.

quannh-uet commented 1 year ago

Hi @natalyjazzviolin could you review my PR above?

natalyjazzviolin commented 1 year ago

Hey @quannh-uet , we have a code freeze in place until next week, then the team would be able to review and merge your PR. Meanwhile, could you run your integration tests locally, make sure everything passes and include the logs in the PR? This will make the review process quicker!