airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.07k stars 4.11k forks source link

Redshift destination INSERT mode: Server closed connection #6388

Closed harshithmullapudi closed 2 years ago

harshithmullapudi commented 3 years ago
## Enviroment Is this your first time deploying Airbyte: Yes OS Version / Instance: EC2 t3.large Memory / Disk: 8Gb / 500GB SSD Deployment: Docker Airbyte Version: 0.29.21-alpha Source name/version: Postgres 0.3.11 Destination name/version: Redshift 0.3.14 Step: On sync ## Current Behavior I'm trying to sync for the first time and the process doesn't finish. The source table has around 20M records and the sync runs for hours and errors out by some reason. Sometimes, broken-pipe, null-pointer exception, some thing or the other. Is there a way I can do the initial load using S3 command or any other way and then airbyte does the incremental load from then? ## Expected Behavior *Tell us what should happen.* ## Logs *If applicable, please upload the logs from the failing operation. For sync jobs, you can download the full logs from the UI by going to the sync attempt page and clicking the download logs button at the top right of the logs display window.*
LOG ``` replace this with your long log output here ```
## Steps to Reproduce 1. 2. 3. ## Are you willing to submit a PR?

Remove this with your answer.

kailashjoshi018 commented 3 years ago
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9284000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9285000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9286000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9287000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9288000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9289000
2021-09-22 21:19:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9290000
2021-09-22 21:19:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:19:04 INFO i.a.i.d.r.RedshiftSqlOperations(insertRecordsInternal):59 - {} - actual size of batch: 10000
2021-09-22 21:19:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9291000
2021-09-22 21:20:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:20 WARN i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-09-22 21:20:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:20 ERROR i.a.i.d.b.BufferedStreamConsumer(close):210 - {} - executing on failed close procedure.
2021-09-22 21:20:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:20 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):176 - {} - Finalizing tables in destination started for 7 streams
2021-09-22 21:20:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:20 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream accounts. schema users, tmp table _airbyte_tmp_vrq_accounts, final table _airbyte_raw_accounts
2021-09-22 21:20:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:20 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream payment_methods. schema users, tmp table _airbyte_tmp_bao_payment_methods, final table _airbyte_raw_payment_methods
2021-09-22 21:20:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:21 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream payment_refunds. schema users, tmp table _airbyte_tmp_xjm_payment_refunds, final table _airbyte_raw_payment_refunds
2021-09-22 21:20:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:24 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream payment_transaction_errors. schema users, tmp table _airbyte_tmp_mwt_payment_transaction_errors, final table _airbyte_raw_payment_transaction_errors
2021-09-22 21:20:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:26 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream payment_transactions. schema users, tmp table _airbyte_tmp_qhf_payment_transactions, final table _airbyte_raw_payment_transactions
2021-09-22 21:20:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:27 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream user_subscription_transactions. schema users, tmp table _airbyte_tmp_rfq_user_subscription_transactions, final table _airbyte_raw_user_subscription_transactions
2021-09-22 21:20:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:30 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream user_subscriptions. schema users, tmp table _airbyte_tmp_xsr_user_subscriptions, final table _airbyte_raw_user_subscriptions
2021-09-22 21:20:34 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:20:34 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):194 - {} - Executing finalization of tables.
2021-09-22 21:23:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:48 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):196 - {} - Finalizing tables in destination completed.
2021-09-22 21:23:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:48 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):199 - {} - Cleaning tmp tables in destination started for 7 streams
2021-09-22 21:23:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:48 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream accounts. schema users, tmp table name: _airbyte_tmp_vrq_accounts
2021-09-22 21:23:50 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:50 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream payment_methods. schema users, tmp table name: _airbyte_tmp_bao_payment_methods
2021-09-22 21:23:52 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:52 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream payment_refunds. schema users, tmp table name: _airbyte_tmp_xjm_payment_refunds
2021-09-22 21:23:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:55 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream payment_transaction_errors. schema users, tmp table name: _airbyte_tmp_mwt_payment_transaction_errors
2021-09-22 21:23:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:56 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream payment_transactions. schema users, tmp table name: _airbyte_tmp_qhf_payment_transactions
2021-09-22 21:23:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:23:58 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream user_subscription_transactions. schema users, tmp table name: _airbyte_tmp_rfq_user_subscription_transactions
2021-09-22 21:24:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:24:00 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream user_subscriptions. schema users, tmp table name: _airbyte_tmp_xsr_user_subscriptions
2021-09-22 21:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-22 21:24:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):208 - {} - Cleaning tmp tables in destination completed.
2021-09-22 21:24:02 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):251 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@1f5d3343[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@119aded8[data={"cdc":false,"streams":[{"stream_name":"accounts","stream_namespace":"public","cursor_field":["account_id"],"cursor":"180017473"},{"stream_name":"payment_methods","stream_namespace":"public","cursor_field":["payment_method_id"],"cursor":"1777153"},{"stream_name":"payment_refunds","stream_namespace":"public","cursor_field":["payment_refund_id"],"cursor":"58619"},{"stream_name":"payment_transaction_errors","stream_namespace":"public","cursor_field":["payment_transaction_error_id"],"cursor":"195999"},{"stream_name":"payment_transactions","stream_namespace":"public","cursor_field":["payment_transaction_id"]},{"stream_name":"user_subscription_transactions","stream_namespace":"public","cursor_field":["user_subscription_transaction_id"]},{"stream_name":"user_subscriptions","stream_namespace":"public","cursor_field":["user_subscription_id"]}]},additionalProperties={}],additionalProperties={}]
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.sql.SQLException: [Amazon](600001) The server closed the connection.
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 -     at com.amazon.support.channels.TLSSocketChannel.read(Unknown Source)
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 - Caused by: com.amazon.support.exceptions.GeneralException: [Amazon](600001) The server closed the connection.
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 -     ... 1 more
2021-09-23 00:56:50 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):199 - Running sync worker cancellation...
2021-09-23 00:56:50 INFO () DefaultReplicationWorker(cancel):270 - Cancelling replication worker...
2021-09-23 00:57:00 INFO () DefaultReplicationWorker(cancel):278 - Cancelling destination...
2021-09-23 00:57:00 INFO () DefaultAirbyteDestination(cancel):134 - Attempting to cancel destination process...
2021-09-23 00:57:00 INFO () DefaultAirbyteDestination(cancel):139 - Destination process exists, cancelling...
2021-09-23 00:57:00 INFO () DefaultAirbyteDestination(cancel):141 - Cancelled destination process!
2021-09-23 00:57:00 INFO () DefaultReplicationWorker(cancel):285 - Cancelling source...
2021-09-23 00:57:00 INFO () DefaultAirbyteSource(cancel):141 - Attempting to cancel source process...
2021-09-23 00:57:00 INFO () DefaultAirbyteSource(cancel):146 - Source process exists, cancelling...
2021-09-23 00:57:00 WARN () LineGobbler(voidCall):88 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2021-09-23 00:57:00 ERROR () DefaultReplicationWorker(run):148 - Sync worker failed.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Broken pipe
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:140) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:52) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:165) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at java.lang.Thread.run(Thread.java:832) [?:?]
    Suppressed: io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
        at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:135) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:121) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:52) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:165) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at java.lang.Thread.run(Thread.java:832) [?:?]
    Suppressed: java.io.IOException: Stream closed
        at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:442) ~[?:?]
        at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
        at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:321) ~[?:?]
        at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:325) ~[?:?]
        at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159) ~[?:?]
        at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251) ~[?:?]
        at java.io.BufferedWriter.flush(BufferedWriter.java:257) ~[?:?]
        at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.notifyEndOfStream(DefaultAirbyteDestination.java:107) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:120) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:121) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:52) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:165) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
        at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: java.io.IOException: Broken pipe
    at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:234) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
    ... 1 more
Caused by: java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
    at java.io.FileOutputStream.write(FileOutputStream.java:347) ~[?:?]
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
    at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
    at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:312) ~[?:?]
    at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:290) ~[?:?]
    at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:131) ~[?:?]
    at java.io.OutputStreamWriter.write(OutputStreamWriter.java:208) ~[?:?]
    at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
    at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?]
    at java.io.Writer.write(Writer.java:249) ~[?:?]
    at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:99) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:50) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:219) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
    ... 1 more
2021-09-23 00:57:00 INFO () DefaultAirbyteSource(cancel):148 - Cancelled source process!
2021-09-23 00:57:00 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):203 - Interrupting worker thread...
2021-09-23 00:57:00 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):206 - Cancelling completable future...
2021-09-23 00:57:00 INFO () DefaultReplicationWorker(run):172 - sync summary: io.airbyte.config.ReplicationAttemptSummary@7fcca432[status=cancelled,recordsSynced=9291114,bytesSynced=3611597551,startTime=1632333550297,endTime=1632358620319]
2021-09-23 00:57:00 WARN () CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):71 - Job either timeout-ed or was cancelled.
2021-09-23 00:57:00 INFO () DefaultReplicationWorker(run):179 - Source output at least one state message
2021-09-23 00:57:00 INFO () TemporalAttemptExecution(get):135 - Stopping cancellation check scheduling...
2021-09-23 00:57:00 WARN () POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=65f03f19-5431-3a5c-aa8d-b602e372f722, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
    at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$2(TemporalAttemptExecution.java:209) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:70) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:212) ~[io.airbyte-airbyte-workers-0.29.21-alpha.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
    at java.lang.Thread.run(Thread.java:832) [?:?]
2021-09-23 00:57:00 INFO () DefaultReplicationWorker(run):185 - State capture: Updated state to: Optional[io.airbyte.config.State@257e83ea[state={"cdc":false,"streams":[{"stream_name":"accounts","stream_namespace":"public","cursor_field":["account_id"],"cursor":"180017473"},{"stream_name":"payment_methods","stream_namespace":"public","cursor_field":["payment_method_id"],"cursor":"1777153"},{"stream_name":"payment_refunds","stream_namespace":"public","cursor_field":["payment_refund_id"],"cursor":"58619"},{"stream_name":"payment_transaction_errors","stream_namespace":"public","cursor_field":["payment_transaction_error_id"],"cursor":"195999"},{"stream_name":"payment_transactions","stream_namespace":"public","cursor_field":["payment_transaction_id"]},{"stream_name":"user_subscription_transactions","stream_namespace":"public","cursor_field":["user_subscription_transaction_id"]},{"stream_name":"user_subscriptions","stream_namespace":"public","cursor_field":["user_subscription_id"]}]}]]
2021-09-23 00:57:00 WARN () CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):71 - Job either timeout-ed or was cancelled.
Phlair commented 3 years ago

I think the key log lines from above might be:

LineGobbler(voidCall):85 - Exception in thread "main" java.sql.SQLException: [Amazon](600001) The server closed the connection. 
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 - at com.amazon.support.channels.TLSSocketChannel.read(Unknown Source) 
2021-09-22 21:24:02 ERROR () LineGobbler(voidCall):85 - Caused by: com.amazon.support.exceptions.GeneralException: [Amazon](600001) The server closed the connection.

so this is possibly a Redshift destination issue

sherifnada commented 3 years ago

@kailashjoshi018 are you using inserts or bulk loading? does the issue persist if you use bulk loading?

kailashjoshi018 commented 3 years ago

@sherifnada - I simply create the connection and start the sync in Airbyte and have this issue. I am not sure if there's any other way to do it in airbyte?

sherifnada commented 3 years ago

@kailashjoshi018 the redshift destination allows bulk loading. Do you see that option in the UI?

kailashjoshi018 commented 3 years ago

@sherifnada - I just see the S3 configuration inputs in the UI and it doesn't show anything like bulk option. I believe you are talking about this? If yes, then could you please help me understand how this works? My source is postgres and to do this bulk option should I first upload the data to S3 and then do the bulk load? If yes, then how is this going to work incremental way? After the bulk load, can I just use inserts to pick the incremental data from postgres or it should always be the S3? Your help is much appreciated.

sherifnada commented 3 years ago

@kailashjoshi018 the docs here describe it: https://docs.airbyte.io/integrations/destinations/redshift

all you need is to give airbyte creds to an S3 bucket and the connector will automatically take care of it. This approach is recommended for long-running or high-scale jobs.

Does this help clarify?

kailashjoshi018 commented 3 years ago

@sherifnada thanks for the information. I was able to setup and start the job. One final question on this though - As my job is also failing while reading the data from Postgres with below error:

2021-09-24 01:38:10 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery 2021-09-24 01:38:10 ERROR () LineGobbler(voidCall):85 - Detail: User query might have needed to see row versions that must be removed. 2021-09-24 01:38:10 ERROR () LineGobbler(voidCall):85 - Hint: In a moment you should be able to reconnect to the database and repeat your command. 2021-09-24 01:38:10 ERROR () LineGobbler(voidCall):85 - at io.airbyte.db.jdbc.JdbcUtils$1.tryAdvance(JdbcUtils.java:71) 2021-09-24 01:38:10 ERROR () LineGobbler(voidCall):85 - at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)

How does this bulk load works in this case?

sherifnada commented 3 years ago

@kailashjoshi018 this looks like an issue with the postgres source potentially vacuuming records like described here. Are you reading from a replica?

kailashjoshi018 commented 3 years ago

@sherifnada - Yes this is the issue from the DB due to very high load. Changed it and working as expected now. Thanks for your help, much appreciated!