tabular-io / iceberg-kafka-connect

Apache License 2.0
169 stars 31 forks source link

Should S3 503 errors be unrecoverable failures? #257

Open logorrheic opened 1 month ago

logorrheic commented 1 month ago

This is a similar report to https://github.com/tabular-io/iceberg-kafka-connect/issues/231: we see these task failures frequently, often daily or weekly. Rather than request more configuration parameters I'd like to question whether these responses really are unrecoverable. If the task could continuously retry (with some back-off period) that would suit us better.

Here's a recent stack:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 503, Request ID: 9ABC7501F237F78E, Extended Request ID: azAj7kH4HNPxpNLh+SrNqfTye4ToKiea7vUBWJJ8OLFsY7XJ/WAE3xLxWilsRDjny/Qb+bKTyX3GAK5BTjwjpYMRHb0ab26l)
        at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
        at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
        at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
        at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
        at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
        at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
        at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
        at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
        at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
        at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
        at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
        at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
        at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
        at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10191)
        at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:438)
        at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:265)
        at org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
        at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204)
        at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257)
        at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
        at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:314)
        at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.close(BaseTaskWriter.java:341)
        at org.apache.iceberg.io.PartitionedFanoutWriter.close(PartitionedFanoutWriter.java:70)
        at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:96)
        at io.tabular.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:130)
        at io.tabular.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:145)
        at io.tabular.iceberg.connect.channel.Worker.lambda$committable$0(Worker.java:66)
        at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
        at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1787)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
        at io.tabular.iceberg.connect.channel.Worker.committable(Worker.java:66)
        at io.tabular.iceberg.connect.channel.CommitterImpl.sendCommitResponse(CommitterImpl.java:141)
        at io.tabular.iceberg.connect.channel.CommitterImpl.receive(CommitterImpl.java:134)
        at io.tabular.iceberg.connect.channel.CommitterImpl.lambda$commit$6(CommitterImpl.java:193)
        at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:135)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:125)
        at io.tabular.iceberg.connect.channel.CommitterImpl.commit(CommitterImpl.java:193)
        at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:43)
        at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        ... 11 more
        Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 1 failure: null (Service: S3, Status Code: 503, Request ID: 5C2EA85A0610B9FC, Extended Request ID: Aedb4kt5tR2O4qVs7GC7Iu2KVRIJkz03ILg5OGiBgWqj8/FFHg35TRS/CUjO5BYCuKn8YpbElxCXJ4eIjgYGa3kPqSn364HW)
        Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 2 failure: null (Service: S3, Status Code: 503, Request ID: 271EFD9FAA802957, Extended Request ID: fgm/fFckuFYE2Irk/INMpe54ibytJZvHbbZ32gU4m9CxeiI/qRZfImfnJx1FPstnUJGNgQb1W4YEHzK5/+AIc/QtokxJykKS)
        Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 3 failure: null (Service: S3, Status Code: 503, Request ID: 1852479BCEA45B70, Extended Request ID: C6v83Fsoxkj05pVkURAEwGAKJ0QH8mBNFl7qk2kqBRqLAaeKFONzzpvIAhlzn2n7dYE0zYc/OR4Lwn9zAUNDMwPO7nD4PnNl)

We're running connector 0.6.18 on Kafka 3.7.0.

tabmatfournier commented 1 month ago

I wonder if this can be solved with https://github.com/tabular-io/iceberg-kafka-connect/pull/233 , as it's a failure handler at the end of the day, with a pluggable classes for dispatching back to the connector after an exception happens. The users are meant to write their own + catch the exceptions that matter to them.

tabmatfournier commented 1 month ago

Also you can't retry continuously, Kafka connect will consider it a zombie at some point.

Best practice is to die and have something continuously restart your kafka connect task.

okayhooni commented 2 weeks ago

Hello @logorrheic ,

I resolved the similar issue with applying adaptive retry mode on AWS sdk! I guess it is also helpful to your case

AWS_RETRY_MODE : adaptive # DEFAULT: legacy
AWS_MAX_ATTEMPTS : 10 # DEFAULT: 3