airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.19k stars 4.14k forks source link

[source-mongodb-v2] Update resume token to latest oplog position even when no new records exist #48435

Open zeev-finaloop opened 1 week ago

zeev-finaloop commented 1 week ago

Connector Name

source-mongodb-v2

Connector Version

1.5.12

What step the error happened?

During the sync

Relevant information

Context

When using CDC replication with oplog, the connector updates the resume token for the connection only when new records are found in any of the collections being synced. If we have a connection that syncs rarely updated collections, while other collections in the MongoDB database (outside of our connection) are frequently updated, subsequent syncs of our connection will need to scan through a large number of irrelevant oplog entries to find updates for its collections.

Current Behavior

Problem

This creates critical operational issues:

  1. When collections outside our connection update frequently, the oplog grows rapidly with their operations
  2. Our connection, which syncs rarely updated collections, must scan through all these oplog entries
  3. Most of these scanned entries are irrelevant for our connection's collections
  4. As time passes and the number of operations from external collections grows:
    • The sync operation becomes increasingly slower
    • Eventually, the sync operation times out and fails
    • Once a sync fails, subsequent attempts will also fail as they need to process even more historical operations
    • If the resume token becomes too old and the required oplog entries are no longer available, a full refresh is required

Proposed Solution

Update the resume token to the latest oplog position after each sync, regardless of whether new records were found in the connection's collections. This ensures that future syncs will start scanning from a more recent position in the oplog, preventing timeout issues and sync failures.

Relevant log output

2024-11-07 05:21:44 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT12M30.000654757S in its first call.
2024-11-07 05:21:44 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):108 CDC events queue poll(): returned nothing.
2024-11-07 05:21:44 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(requestClose):275 No records were returned by Debezium in the timeout seconds 750, closing the engine and iterator
2024-11-07 05:21:44 source > INFO main i.d.e.EmbeddedEngine(stop):957 Stopping the embedded engine
2024-11-07 05:21:44 source > INFO main i.d.e.EmbeddedEngine(stop):964 Waiting for PT5M for connector to stop
2024-11-07 05:21:44 replication-orchestrator > SOURCE analytics [airbyte/source-mongodb-v2:1.5.12] | Type: db-sources-debezium-close-reason | Value: TIMEOUT
2024-11-07 05:21:44 source > INFO pool-2-thread-1 i.d.e.EmbeddedEngine(stopTaskAndCommitOffset):765 Stopping the task and engine
2024-11-07 05:21:44 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(stop):406 Stopping down connector
2024-11-07 05:21:45 source > INFO debezium-mongodbconnector-finaloop-prod-change-event-source-coordinator i.d.c.m.e.BufferingChangeStreamCursor(close):403 Awaiting fetcher thread termination
2024-11-07 05:21:55 destination > INFO pool-6-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 3 GB, allocated: 10 MB (10.0 MB), %% used: 0.0032552083333333335 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-11-07 05:21:55 destination > INFO pool-9-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-11-07 05:22:15 source > INFO debezium-mongodbconnector-finaloop-prod-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamEvents):281 Finished streaming
2024-11-07 05:22:15 source > INFO debezium-mongodbconnector-finaloop-prod-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamingConnected):433 Connected metrics set to 'false'
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.d.p.s.SignalProcessor(stop):127 SignalProcessor stopped
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.d.s.DefaultServiceRegistry(close):105 Debezium ServiceRegistry stopped.
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(taskStopped):91 DebeziumEngine notify: task stopped
2024-11-07 05:22:15 source > INFO pool-2-thread-1 o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.d.c.m.MongoDbConnector(stop):82 Stopping MongoDB connector
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.d.c.m.MongoDbConnector(stop):86 Stopped MongoDB connector
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(connectorStopped):83 DebeziumEngine notify: connector stopped
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):60 Debezium engine shutdown. Engine terminated successfully : true
2024-11-07 05:22:15 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):63 Connector 'io.debezium.connector.mongodb.MongoDbConnector' completed normally.
2024-11-07 05:22:15 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):122 CDC events queue poll(): returned nothing, polling again, attempt 1.
2024-11-07 05:22:15 source > INFO main i.a.i.s.m.c.MongoDbCdcStateHandler(saveState):36 Saving Debezium state MongoDbCdcState[state={"[\"finaloop-prod\",{\"server_id\":\"finaloop-prod\"}]":"{\"sec\":1730869980,\"ord\":9,\"resume_token\":\"82672AFADC000000092B0229296E04\"}"}, schema_enforced=true]...
2024-11-07 05:22:15 source > INFO main i.a.c.i.s.r.s.SourceStateIterator(computeNext):84 sending final state message, with count per stream: {} 
2024-11-07 05:22:15 replication-orchestrator > Stream status TRACE received of status: COMPLETE for stream finaloop-prod:amazonorders
2024-11-07 05:22:15 source > WARN main i.a.c.i.b.IntegrationRunner$Companion(stopOrphanedThreads):469                  The main thread is exiting while children non-daemon threads from a connector are still active.
                 Ideally, this situation should not happen...
                 Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
                 The main thread is: main (RUNNABLE)
Thread stacktrace: java.base/java.lang.Thread.getStackTrace(Thread.java:2451)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.dumpThread(IntegrationRunner.kt:525)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.access$dumpThread(IntegrationRunner.kt:366)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$stopOrphanedThreads$1.invoke(IntegrationRunner.kt:474)
       at io.github.oshai.kotlinlogging.internal.MessageInvokerKt.toStringSafe(MessageInvoker.kt:5)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger$warn$1.invoke(LocationAwareKLogger.kt:191)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger$warn$1.invoke(LocationAwareKLogger.kt:191)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger.at(LocationAwareKLogger.kt:43)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger.warn(LocationAwareKLogger.kt:191)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.stopOrphanedThreads(IntegrationRunner.kt:469)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.stopOrphanedThreads$default(IntegrationRunner.kt:458)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:294)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt)
       at io.airbyte.integrations.source.mongodb.MongoDbSource.main(MongoDbSource.java:53)
2024-11-07 05:22:15 source > WARN main i.a.c.i.b.IntegrationRunner$Companion(stopOrphanedThreads):489 Active non-daemon thread info: debezium-mongodbconnector-finaloop-prod-replicator-fetcher-0 (RUNNABLE)
 Thread stacktrace: java.base/sun.nio.ch.Net.poll(Native Method)
        at java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:191)
        at java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201)
        at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
        at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
        at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
        at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099)
        at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:489)
        at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:483)
        at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
        at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1461)
        at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1066)
        at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:175)
        at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:200)
        at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:739)
        at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:603)
        at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:451)
        at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:372)
        at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
        at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:765)
        at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:76)
        at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:209)
        at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:115)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:83)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:74)
        at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:299)
        at com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute(SyncOperationHelper.java:273)
        at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$3(SyncOperationHelper.java:191)
        at com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$0(SyncOperationHelper.java:127)
        at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
        at com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$1(SyncOperationHelper.java:126)
        at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
        at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:125)
        at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:189)
        at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:292)
        at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
        at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:194)
        at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:176)
        at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:193)
        at com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
        at com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource(SyncOperationHelper.java:99)
        at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
        at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)
        at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
        at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:187)
        at io.debezium.connector.mongodb.events.BufferingChangeStreamCursor$EventFetcher.run(BufferingChangeStreamCursor.java:221)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)

Contribute

marcosmarxm commented 2 days ago

@airbytehq/dbsources can someone take a look into this issue?>

theyueli commented 2 days ago

Thank you for addressing this issue and for the detailed investigation!

If your fix is ready, we would happily review it! Thanks @zeev-finaloop !

cc @rodireich @xiaohansong