After I deployed this sink connector on our most heavy traffic application log topic, I noticed that in the every morning, when people in our country wake up and start their activities, a couple of the tasks of this HOT connector become FAILED with getting S3 503 error having "Please reduce your request rate." message.
Although I already enabled native object store file layout option of Iceberg for all the sink tables, this issue persists almost every day on only that HOT log sink out of all the sinks I deployed.
When I restart failed tasks with this 503 error manually, then all the tasks become healthy with no issue.
As you know, it is related to the adaptive scaling of S3, based on the request patterns. But there is no way to train the daily log influx patterns.
I guess this issue can be alleviated with configuring max_attempts and retry_mode.
How could I configure this options for this Tabular sink connector? (just set environment variables on all the Kafka connect workers. is it right?)
Could I configure this options on the specific HOT connector, out of all the Iceberg sink connectors deployed on the same Kafka connect cluster?
(example of full stack trace)
retry only 3 times (default behavior of AWS sdk)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your request rate. (Service: S3, Status Code: 503, Request ID: DSMTJRN24JKEYKRW, Extended Request ID: IyJzBd0/Ds5BEy0v3b7XpPtEZuDUcw++JWaTPV8dr10oKzT+ySXS3Lz/POIKLKO9asCy8DqhgSY=) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95) at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:270) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:196) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:8783) at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:438) at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:265) at org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204) at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257) at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82) at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:314) at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.close(BaseTaskWriter.java:341) at org.apache.iceberg.io.PartitionedFanoutWriter.close(PartitionedFanoutWriter.java:70) at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:96) at io.tabular.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:130) at io.tabular.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:145) at io.tabular.iceberg.connect.channel.Worker.lambda$receive$2(Worker.java:112) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273) at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.tabular.iceberg.connect.channel.Worker.receive(Worker.java:112) at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:129) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:119) at io.tabular.iceberg.connect.channel.Worker.process(Worker.java:101) at io.tabular.iceberg.connect.IcebergSinkTask.processControlEvents(IcebergSinkTask.java:171) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:158) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) ... 11 more Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 1 failure: Please reduce your request rate. (Service: S3, Status Code: 503, Request ID: 3CT98DM2JF23P1RG, Extended Request ID: bKcNHZWBSpBWlX8nCEuHJ9DV+rJtr7arV0d5s5txceaNLKRAoRtSMZpSL/0ne1a9yu6SI3/eALA=) Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 2 failure: Please reduce your request rate. (Service: S3, Status Code: 503, Request ID: WAX7RH1HBN0B6J16, Extended Request ID: nD+dVz/XfMM28C6jtVCLGLrfb4R2hAIpv7GMI8h+9xShI6UBCPtFCyY3cFLL7r+05vZRTVBhj2k=) Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 3 failure: Please reduce your request rate. (Service: S3, Status Code: 503, Request ID: WAX8DSYYMFKFQQKX, Extended Request ID: od5dBGaQwcpV2AlAlmnOcFmdc/uXY/WETBgNJNXB/QJwP99QTFySVQqPKhUEF3Y/eIEdChSQ36c=)
After I deployed this sink connector on our most heavy traffic application log topic, I noticed that in the every morning, when people in our country wake up and start their activities, a couple of the tasks of this HOT connector become FAILED with getting S3 503 error having "Please reduce your request rate." message.
Although I already enabled native object store file layout option of Iceberg for all the sink tables, this issue persists almost every day on only that HOT log sink out of all the sinks I deployed.
When I restart failed tasks with this 503 error manually, then all the tasks become healthy with no issue.
As you know, it is related to the adaptive scaling of S3, based on the request patterns. But there is no way to train the daily log influx patterns.
So, I found the documentation about retry behavior of AWS sdk like below.
I guess this issue can be alleviated with configuring
max_attempts
andretry_mode
.(example of full stack trace)