Closed danieldiamond closed 2 years ago
Just tried the same connection with MySQL 0.4.3
and it works
breaking change likely from this PR , could also therefore be affecting CDC on MSSQL & Postgres in the same way.
weird! am not sure how my PR could have caused this.
I spent some time trying to reproduce it and I dont think its happening cause of my changed from this PR https://github.com/airbytehq/airbyte/pull/5600
Currently we dont handle the scenario when the worker dies. If a the worker dies then then our source and destination containers would just hang. I could not reproduce why the worker dies but perhaps its because of resource exhaustion. As part of this issue we should handle the scenario when the worker dies and what should happen to the source and destination containers in that case.
@cgardens I would like to have your thoughts on this. Few things that we can do is the source and destination containers should check if the parent worker exists or not, if parent worker doesnt exist, the source and destination containers should self kill.
We actually handle this explicitly for the Kubernetes case by having another container in the connector pod that kills it if it can't reach back to the worker pod (which is serving a heartbeat signal).
It's a bit harder in docker-compose since we'd need more of a "supervisor" container to manage killing other pods (it'd need to keep track of relevant pods and handle the heartbeat checking).
To confirm, the solution here is to kill source and destination containers? Because that doesn't seem like a viable fix if this happens every time ie never move onto the normalisation phase
@danieldiamond agreed! we will be taking this up and releasing the correct fix
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
d60c85d16526 thirsty_brahmagupta 100.90% 1.405GiB / 15.38GiB 9.14% 0B / 0B 231MB / 751MB 35
98d821ed12e5 angry_elion 0.07% 1.044GiB / 15.38GiB 6.79% 0B / 0B 227MB / 0B 132
3f4995606fc7 airbyte-db 0.09% 62.71MiB / 15.38GiB 0.40% 54.4MB / 1.13GB 53.1MB / 209kB 41
0c2f8f3d8d4f airbyte-scheduler 0.04% 302.2MiB / 15.38GiB 1.92% 975MB / 12.2MB 277MB / 0B 39
c1433fbc0188 airbyte-server 23.27% 480.7MiB / 15.38GiB 3.05% 139MB / 1.14GB 320MB / 110MB 45
a07ab1e72be0 airbyte-webapp 0.02% 5.172MiB / 15.38GiB 0.03% 1.15GB / 1.15GB 27MB / 24.6kB 5
a64a5cde0ddb airbyte-worker 0.06% 851.5MiB / 15.38GiB 5.41% 3.96MB / 3.73MB 417MB / 0B 119
7d6f14def198 airbyte-temporal 1.07% 61.82MiB / 15.38GiB 0.39% 27.5MB / 45.4MB 399MB / 24.6kB 16
^C
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d60c85d16526 airbyte/source-mysql:0.4.4 "/airbyte/base.sh re…" 2 hours ago Up 2 hours thirsty_brahmagupta
98d821ed12e5 airbyte/destination-snowflake:0.3.14 "/airbyte/base.sh wr…" 2 hours ago Up 2 hours angry_elion
3f4995606fc7 airbyte/db:0.29.17-alpha "docker-entrypoint.s…" 4 hours ago Up 4 hours 5432/tcp airbyte-db
0c2f8f3d8d4f airbyte/scheduler:0.29.17-alpha "/bin/bash -c bin/${…" 4 hours ago Up 4 hours airbyte-scheduler
c1433fbc0188 airbyte/server:0.29.17-alpha "/bin/bash -c bin/${…" 4 hours ago Up 4 hours 8000/tcp, 0.0.0.0:8001->8001/tcp airbyte-server
a07ab1e72be0 airbyte/webapp:0.29.17-alpha "/docker-entrypoint.…" 4 hours ago Up 4 hours 0.0.0.0:8000->80/tcp airbyte-webapp
8a59f335d32d airbyte/init:0.29.17-alpha "/bin/sh -c './scrip…" 4 hours ago Exited (0) 4 hours ago init
a64a5cde0ddb airbyte/worker:0.29.17-alpha "/bin/bash -c bin/${…" 4 hours ago Up 4 hours airbyte-worker
7d6f14def198 temporalio/auto-setup:1.7.0 "/entrypoint.sh /bin…" 8 days ago Up 4 hours 6933-6935/tcp, 6939/tcp, 7234-7235/tcp, 7239/tcp, 0.0.0.0:7233->7233/tcp airbyte-temporal
Airbyte version: 0.29.17-alpha OS Version / Instance: AWS EC2 Deployment: Docker Source Connector and version: MySQL 0.4.4 Destination Connector and version: Snowflake 0.3.14
@subodh1810
Running with 0.4.4
where the worker dies issue occurs. Here the worker is still alive but the sync is hanging.
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):200 - {} - Closing database connection pool.
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):202 - {} - Closed database connection pool.
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.source.mysql.MySqlSource
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.m.MySqlSource(main):251 - {} - completed source: class io.airbyte.integrations.source.mysql.MySqlSource
tl;dr confirming Worker container is still alive but getting stuck before providing DefaultReplicationWorker(run):141 - Source thread complete
and that this issue is occurring for both MySQL 0.4.3
and 0.4.4
More updates. This could be a resource issue or I lucked out on a random try but I just doubled the instance to m5.2xlarge
and the larger connector that consistently failed has now worked with source 0.4.4
So I think I've ruled out it being a resource issue as I've tried another connector with the m5.2xlarge
(after the previous one successfully completed) and it hangs.
What is interesting though is that in this hanging state (where source container is at ~100% CPU and worker container is at ~0.04% CPU) I run docker restart airbyte-worker
, which then triggers the COPY INTO
commands in snowflake but the actual job doesn't change state. Still hanging even though the _AIRBYTE_RAW
tables were created
Update: I attempted to retry this connection (MySQL CDC for one table that is 60m rows and ~5gb data in source - although successful sync shows ~17gb in job sync status when trying it as STANDARD). Anyways, it appears to push past the hanging stage! 🎉 but then I run into another issue, which spits out this error repeatedly to no end
2021-09-26 02:45:03 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:124) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:144) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:123) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:67) ~[io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:65) [io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:43) [io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:114) [io.airbyte-airbyte-commons-0.29.22-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:98) [io.airbyte-airbyte-commons-0.29.22-alpha.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Updates: I successful sync a 60M row table that I was previously having this issue with. but now attempting to try the entire schema (~250M rows) and we're back at the hanging issue
MySQL 0.4.8 Snowflake 0.3.16 CDC AWS S3 Staging 0.30.20-alpha
Seeing the same thing for Postgres -> Snowflake with v0.32.1-alpha
:
2021-11-18 08:42:25 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 74243000
[35mdestination[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 [32mINFO[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 46 containing 10.01 MB]
[35mdestination[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 [32mINFO[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 47 containing 10.01 MB]
[35mdestination[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 [32mINFO[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 48 containing 10.01 MB]
[35mdestination[0m - 2021-11-18 08:42:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:27 [32mINFO[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 49 containing 10.01 MB]
2021-11-18 08:57:58 WARN () ActivityExecutionContextImpl(doHeartBeat):153 - Heartbeat failed
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 6.648743300s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.4:7233]]]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.recordActivityTaskHeartbeat(WorkflowServiceGrpc.java:2710) ~[temporal-serviceclient-1.0.4.jar:?]
at io.temporal.internal.sync.ActivityExecutionContextImpl.sendHeartbeatRequest(ActivityExecutionContextImpl.java:203) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.ActivityExecutionContextImpl.doHeartBeat(ActivityExecutionContextImpl.java:147) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.ActivityExecutionContextImpl.heartbeat(ActivityExecutionContextImpl.java:108) ~[temporal-sdk-1.0.4.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:46) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:216) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
[35mdestination[0m - 2021-11-18 08:57:58 ERROR () LineGobbler(voidCall):82 - /airbyte/javabase.sh: line 12: 9 Killed /airbyte/bin/"$APPLICATION" "$@"
2021-11-18 08:57:58 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 120.999879060s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.4:7233]]]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:110) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:131) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:109) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:51) ~[io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:51) [io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:22) [io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:94) [io.airbyte-airbyte-commons-0.32.1-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:78) [io.airbyte-airbyte-commons-0.32.1-alpha.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-18 09:43:23 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):203 - Running sync worker cancellation...
2021-11-18 09:43:23 INFO () DefaultReplicationWorker(cancel):250 - Cancelling replication worker...
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(cancel):258 - Cancelling destination...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):120 - Attempting to cancel destination process...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):125 - Destination process exists, cancelling...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):127 - Cancelled destination process!
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(cancel):265 - Cancelling source...
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):128 - Attempting to cancel source process...
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):133 - Source process exists, cancelling...
2021-11-18 09:43:33 WARN () LineGobbler(voidCall):86 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):135 - Cancelled source process!
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):207 - Interrupting worker thread...
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):210 - Cancelling completable future...
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling...
2021-11-18 09:43:33 WARN () CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):49 - Job either timeout-ed or was cancelled.
2021-11-18 09:43:33 WARN () POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=73f4cfbc-0bc5-3073-9135-8409eb33ec1b, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:213) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:48) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:216) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-18 09:43:33 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Broken pipe
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:120) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Suppressed: io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:122) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Suppressed: java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:442) ~[?:?]
at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:321) ~[?:?]
at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:325) ~[?:?]
at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159) ~[?:?]
at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251) ~[?:?]
at java.io.BufferedWriter.flush(BufferedWriter.java:257) ~[?:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.notifyEndOfStream(DefaultAirbyteDestination.java:93) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:106) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: java.io.IOException: Broken pipe
at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:214) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
... 1 more
Caused by: java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
at java.io.FileOutputStream.write(FileOutputStream.java:347) ~[?:?]
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:312) ~[?:?]
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:290) ~[?:?]
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:131) ~[?:?]
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:208) ~[?:?]
at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?]
at java.io.Writer.write(Writer.java:249) ~[?:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:85) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:33) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:199) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
... 1 more
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@6ce23dad[status=cancelled,recordsSynced=74243233,bytesSynced=35433328953,startTime=1637214273337,endTime=1637228613362]
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(run):159 - Source output at least one state message
2021-11-18 09:43:33 WARN () DefaultReplicationWorker(run):172 - State capture: No state retained.
Running with https://github.com/airbytehq/airbyte/commit/51866fd37a3b702b406612898697ba0f977abfb9 seems to work and actually finish the job :+1:.
I've created a clean k8s deployment of airbyte with one connector, one table. And this still occurs. I'm not sure how/if any user is using airbyte to migrate large tables with CDC
Airbyte version: 0.35.12-alpha OS Version / Instance: AWS EC2 Deployment: Kubernetes Source Connector and version: MySQL 0.5.1 Destination Connector and version: Snowflake 0.4.8 Severity: Critical
here are four seperate users with various sources/destinations that seem to be experiencing this issue:
Airbyte version: 0.35.15-alpha Deployment: Cloud Source Connector and version: MySQL Strict Encrypt 0.1.3 (CDC) Destination Connector and version: Snowflake 0.4.8 (Internal Staging and Copy S3)
@danieldiamond I did some lengthy experiments in cloud using the same database schema as in the description. The number of rows is 300 million. In none of the cases did the connection lead to a hang, but at the same time I confirm the failed connection with errors 1) https://cloud.airbyte.io/workspaces/071c149e-9bca-4a99-84c6-3c9b02f566a7/connections/70a22d3a-dceb-46a9-bf7d-14ce7157b8bb
Caused by: java.net.SocketException: Broken pipe at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) ~[?:?] at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?] at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) ~[?:?] at java.net.Socket$SocketOutputStream.write(Socket.java:1035) ~[?:?] at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?] at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?] at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?] at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?] at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?] at java.io.Writer.write(Writer.java:249) ~[?:?] at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90) ~[io.airbyte-airbyte-workers-0.35.15-alpha.jar:?] at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:277) ~[io.airbyte-airbyte-workers-0.35.15-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] ... 1 more
Caused by: java.net.SocketException: Connection reset by peer at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) ~[?:?] at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?] at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) ~[?:?] at java.net.Socket$SocketOutputStream.write(Socket.java:1035) ~[?:?] at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?] at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?] at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?]
3) https://cloud.airbyte.io/workspaces/071c149e-9bca-4a99-84c6-3c9b02f566a7/connections/a509a140-589b-4771-8850-359b416264ff
Caused by: java.net.SocketException: Connection timed out at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035) at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) at java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) at java.base/java.io.BufferedWriter.write(BufferedWriter.java:233) at java.base/java.io.Writer.write(Writer.java:249) at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90) at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:277) ... 4 more
@VitaliiMaltsev are you suggesting that it doesn't work in the cloud either?
separately, rereading earlier comments: @jrhizor @subodh1810
We actually handle this explicitly for the Kubernetes case by having another container in the connector pod that kills it if it can't reach back to the worker pod (which is serving a heartbeat signal).
It's a bit harder in docker-compose since we'd need more of a "supervisor" container to manage killing other pods (it'd need to keep track of relevant pods and handle the heartbeat checking).
i've tried this now with k8s and an unreasonable amount of resources to ensure this isn't a resource/memory issue. the job still hangs after reading all the records. then is the issue still the situation where the worker dies (if this is explicitly handled that in k8s, then that might not be working as expected)
FYI @VitaliiMaltsev this seems to be the exact same issue, with a lot more context if you're interested https://github.com/airbytehq/airbyte/issues/8218
All syncs permanently failed with latest master branch
io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:252) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:209) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:193) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:151) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) ~[temporal-sdk-1.6.0.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478) ~[?:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:217) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:53) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:220) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] ... 3 more
LOG
``` 2021-08-31 02:27:03 INFO () WorkerRun(call):62 - Executing worker wrapper. Airbyte version: 0.29.13-alpha 2021-08-31 02:27:04 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.29.13-alpha 2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):102 - start sync worker. job id: 7000 attempt id: 0 2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):111 - configured sync modes: {myschema.tableA=incremental - append_dedup, myschema.tableb=incremental - append_dedup, myschema.tableC=incremental - append_dedup, myschema.tableD=incremental - append_dedup, myschema.tableE=incremental - append_dedup, myschema.tableF=incremental - append_dedup, myschema.tableH=incremental - append, myschema.tableI=incremental - append_dedup, myschema.tableJ=incremental - append_dedup, myschema.tableK=incremental - append_dedup} 2021-08-31 02:27:04 INFO () DefaultAirbyteDestination(start):78 - Running destination... 2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - Checking if airbyte/destination-snowflake:0.3.12 exists... 2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - airbyte/destination-snowflake:0.3.12 was found locally. 2021-08-31 02:27:04 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/7000/0 --network host --log-driver none airbyte/destination-snowflake:0.3.12 write --config destination_config.json --catalog destination_catalog.json 2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - Checking if airbyte/source-mysql:0.4.4 exists... 2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - airbyte/source-mysql:0.4.4 was found locally. 2021-08-31 02:27:04 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/7000/0 --network host --log-driver none airbyte/source-mysql:0.4.4 read --config source_config.json --catalog source_catalog.json 2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):139 - Waiting for source thread to join. 2021-08-31 02:27:04 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):246 - Destination output thread started. 2021-08-31 02:27:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):210 - Replication thread started. 2021-08-31 02:27:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:10 [32mINFO[m i.a.i.d.s.SnowflakeDestination(main):81 - {} - starting destination: class io.airbyte.integrations.destination.snowflake.SnowflakeDestination 2021-08-31 02:27:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:10 [32mINFO[m i.a.i.s.m.MySqlSource(main):249 - {} - starting source: class io.airbyte.integrations.source.mysql.MySqlSource 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json} 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'} 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.source.mysql.MySqlSource 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, config=source_config.json} 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: READ 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'} 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.d.j.c.SwitchingDestination(getConsumer):83 - {} - Using destination type: INSERT 2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 [32mINFO[m i.a.i.s.r.CdcStateManager(snippet from a successful sync with the same source + destination connectors
``` 2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:34 [32mINFO[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.source.mysql.MySqlSource 2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:34 [32mINFO[m i.a.i.s.m.MySqlSource(main):251 - {} - completed source: class io.airbyte.integrations.source.mysql.MySqlSource 2021-08-31 01:55:36 INFO () DefaultReplicationWorker(run):141 - Source thread complete. 2021-08-31 01:55:36 INFO () DefaultReplicationWorker(run):142 - Waiting for destination thread to join. 2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 [32mINFO[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded. 2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 [32mINFO[m i.a.i.d.b.BufferedStreamConsumer(close):212 - {} - executing on success close procedure. 2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 62 2021-08-31 01:55:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:38 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 237 2021-08-31 01:55:39 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:39 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 1442 2021-08-31 01:55:41 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:41 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 48 2021-08-31 01:55:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:42 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 77 2021-08-31 01:55:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:44 [32mINFO[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 21 2021-08-31 01:55:45 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:45 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):176 - {} - Finalizing tables in destination started for 12 streams ... ```