airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.25k stars 4.15k forks source link

[source-mongodb] Incremental sync error on specific collection - Saved offset is not valid #48496

Open Hugoch opened 1 week ago

Hugoch commented 1 week ago

Connector Name

source-mongodb

Connector Version

1.5.12

What step the error happened?

During the sync

Relevant information

After a successful initial sync, incremental replication randomly fails after 1-10 successful ones with a Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent this from happening in the future.

I manually checked the resume token from the logs which is invalid, but manually testing a resume token from a previous successful sync works (which seems to confirm that oplog is not truncated).

Reducing the replication frequency to 5 min (with oplog ~4days retention) does not fix.

Some other collections from the same cluster sync without issues.

More info

cc @theyueli

Relevant log output

2024-11-04 18:30:19 source > INFO main i.a.i.s.m.c.MongoDbDebeziumStateUtil(isValidResumeToken):124 Exception : Command failed with error 280 (ChangeStreamFatalError): 'PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: "82672911AA000000042B042C0100296E5A1004111CD8A7C3AC4108BD652688BF9B4961463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646607170ADC704B0B4CEF9E4F000004"}' on server foo-prod-shard-00-02.abcde.mongodb.net:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: \"82672911AA000000042B042C0100296E5A1004111CD8A7C3AC4108BD652688BF9B4961463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646607170ADC704B0B4CEF9E4F000004\"}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730745019, "i": 5}}, "signature": {"hash": {"$binary": {"base64": "8gooMnQDcDZ4N47zoh55j9GzBjk=", "subType": "00"}}, "keyId": 7428267533251117062}}, "operationTime": {"$timestamp": {"t": 1730745019, "i": 4}}}
2024-11-04 18:30:19 source > INFO main i.a.i.s.m.c.MongoDbDebeziumStateUtil(isValidResumeToken):125 Invalid resume token '826729119A0000003D2B042C0100296E5A100479290C5D090D4810962649B5F651BDB3463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006466BDC93807F670F0E483042F000004' present, corresponding to timestamp (seconds after epoch) : 1730744730, due to reason Command failed with error 280 (ChangeStreamFatalError): 'PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: "82672911AA000000042B042C0100296E5A1004111CD8A7C3AC4108BD652688BF9B4961463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646607170ADC704B0B4CEF9E4F000004"}' on server foo-prod-shard-00-02.abcde.mongodb.net:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: \"82672911AA000000042B042C0100296E5A1004111CD8A7C3AC4108BD652688BF9B4961463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646607170ADC704B0B4CEF9E4F000004\"}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730745019, "i": 5}}, "signature": {"hash": {"$binary": {"base64": "8gooMnQDcDZ4N47zoh55j9GzBjk=", "subType": "00"}}, "keyId": 7428267533251117062}}, "operationTime": {"$timestamp": {"t": 1730745019, "i": 4}}}
2024-11-04 18:30:19 replication-orchestrator > SOURCE analytics [airbyte/source-mongodb-v2:1.5.12] | Type: db-sources-cdc-cursor-invalid | Value: 1
2024-11-04 18:30:19 source > ERROR main i.a.i.s.m.MongoDbSource(read):157 Unable to perform sync read operation. io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details
    at io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer.createCdcIterators(MongoDbCdcInitializer.java:136) ~[io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]
    at io.airbyte.integrations.source.mongodb.MongoDbSource.read(MongoDbSource.java:149) [io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.integrations.source.mongodb.MongoDbSource.main(MongoDbSource.java:53) [io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]

Stack Trace: io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details
    at io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer.createCdcIterators(MongoDbCdcInitializer.java:136)
    at io.airbyte.integrations.source.mongodb.MongoDbSource.read(MongoDbSource.java:149)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt)
    at io.airbyte.integrations.source.mongodb.MongoDbSource.main(MongoDbSource.java:53)

2024-11-04 18:30:19 source > ERROR main i.a.c.i.u.ConnectorExceptionHandler(handleException):68 caught exception! io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details
    at io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer.createCdcIterators(MongoDbCdcInitializer.java:136) ~[io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]
    at io.airbyte.integrations.source.mongodb.MongoDbSource.read(MongoDbSource.java:149) ~[io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290) ~[airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt) [airbyte-cdk-core-0.45.1.jar:?]
    at io.airbyte.integrations.source.mongodb.MongoDbSource.main(MongoDbSource.java:53) [io.airbyte.airbyte-integrations.connectors-source-mongodb-v2.jar:?]

Stack Trace: io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details
    at io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcInitializer.createCdcIterators(MongoDbCdcInitializer.java:136)
    at io.airbyte.integrations.source.mongodb.MongoDbSource.read(MongoDbSource.java:149)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt)
    at io.airbyte.integrations.source.mongodb.MongoDbSource.main(MongoDbSource.java:53)

2024-11-04 18:30:19 replication-orchestrator > readFromSource: source exception
io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1
    at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:364) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:222) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

Contribute

marcosmarxm commented 1 week ago

Thanks for reporting the issue @Hugoch .

@airbytehq/dbsources can someone take a look into this issue?

rodireich commented 1 week ago

Hi @Hugoch thanks for reaching out. Do you see any schema change happening before these failures?

Would you be able to share a successive logs from the an initial sync that finished successfully and an incremental sync the failed afterwards?

rodireich commented 1 week ago

The following cloud workspace may be showing the same error

theyueli commented 1 week ago

@rodireich @xiaohansong
This issue could be related: https://github.com/airbytehq/airbyte/issues/48435

ngocquang2797 commented 4 days ago

This is a similar issue, but it hasn’t been resolved yet.

ngocquang2797 commented 4 days ago

I’m also encountering a similar issue with the error “saved offset is not valid” when syncing data every 30 minutes. I checked the replication info and found that the log length from start to end is approximately 72 hours. Image

Image I increased the sync frequency to every 6 minutes, but I’m still encountering this error.

Hugoch commented 4 days ago

@ngocquang2797 It seems that your actual Oplog size is bigger than your configured Oplog size. This may be due to your majority commit point being far in the past, so you probably have a large replication lag? I am not sure it is related to the issue, but it might be worth investigating.