tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Fail to inject multiple topics into one table with one connector #129

Closed okayhooni closed 8 months ago

okayhooni commented 9 months ago

Iceberg table format has optimistic concurrency model on table-write and atomic metadata swap.

However, I expected this connector working well if those source topics are declared on the same connector spec. (=NOT MULTIPLE CONNECTORS WITH SAME TARGET TABLE)

config:
  topics: "topic_a,topic_b,topic_c"

But, the results of the test were different than I expected..

Is there any plan to support multi-topic ingestion to one table with only one connector

[ERROR LOG]

2023-10-18 15:40:30,778 WARN [iceberg-tabular-sink-connector-avro-schema-poc-v8|task-0] Retrying task after failure: Cannot commit iceberg.temp_beta.poc_tabular_kafka_iceberg_connector_avro_schema_poc_v8 because Glue detected concurrent update (org.apache.iceberg.util.Tasks) [task-thread-iceberg-tabular-sink-connector-avro-schema-poc-v8-0]
org.apache.iceberg.exceptions.CommitFailedException: Cannot commit iceberg.temp_beta.poc_tabular_kafka_iceberg_connector_avro_schema_poc_v8 because Glue detected concurrent update
    at org.apache.iceberg.aws.glue.GlueTableOperations.handleAWSExceptions(GlueTableOperations.java:355)
    at org.apache.iceberg.aws.glue.GlueTableOperations.doCommit(GlueTableOperations.java:180)
    at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
    at org.apache.iceberg.SchemaUpdate.commit(SchemaUpdate.java:442)
    at io.tabular.iceberg.connect.data.SchemaUtils.commitSchemaUpdates(SchemaUtils.java:135)
    at io.tabular.iceberg.connect.data.SchemaUtils.lambda$applySchemaUpdates$0(SchemaUtils.java:92)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at io.tabular.iceberg.connect.data.SchemaUtils.applySchemaUpdates(SchemaUtils.java:92)
    at io.tabular.iceberg.connect.data.IcebergWriter.convertToRow(IcebergWriter.java:92)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:62)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:145)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: software.amazon.awssdk.services.glue.model.ConcurrentModificationException: Update table failed due to concurrent modifications. (Service: Glue, Status Code: 400, Request ID: 2321ccc1-9bc0-48f4-a5e8-8bf8fafaa71d)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at software.amazon.awssdk.services.glue.DefaultGlueClient.updateTable(DefaultGlueClient.java:15898)
    at org.apache.iceberg.aws.glue.GlueTableOperations.persistGlueTable(GlueTableOperations.java:331)
    at org.apache.iceberg.aws.glue.GlueTableOperations.doCommit(GlueTableOperations.java:160)
    ... 30 more
bryanck commented 9 months ago

Multiple topic ingestion is supported, either via the topics or topics.regex configurations. This is a warning message that can be safely ignored, suppressing the stack trace will require an update to Iceberg so for now you'll need to configure your logging to suppress this, if desired.

bryanck commented 9 months ago

I'll investigate alternatives for suppressing the stack trace in the meantime.

okayhooni commented 9 months ago

@bryanck

I am really appreciate to you for quick answer.

But, the connector I deployed didn't get into READY states.. (deployed with custom resource based on k8s strimzi operator)

(the only difference between v7 and v8 is mapping multiple topic)

image
bryanck commented 9 months ago

Are there any other errors? The one you posted is something that will be retried. Also, what version are you using?

okayhooni commented 9 months ago

I couldn't find any other errors..

I use the iceberg-kafka-connect-runtime-hive-0.5.7 version

The avro schemas of multiple topics are different. -> I expected the auto-created table schema had all the fields of multi-topics & inject null value to non-existing field of each topic

okayhooni commented 9 months ago

Oh.. I found other error.. like below..

2023-10-18 15:40:32,310 ERROR [iceberg-tabular-sink-connector-avro-schema-poc-v8|task-1] WorkerSinkTask{id=iceberg-tabular-sink-connector-avro-schema-poc-v8-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: An error occurred converting record, topic: order.beta.streaming.pay-result.avro, partition, 1, offset: 25908 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-iceberg-tabular-sink-connector-avro-schema-poc-v8-1]
org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: order.beta.streaming.pay-result.avro, partition, 1, offset: 25908
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:145)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Boolean.booleanValue()" because "value" is null
    at org.apache.iceberg.parquet.ColumnWriter$1.write(ColumnWriter.java:34)
    at org.apache.iceberg.parquet.ColumnWriter$1.write(ColumnWriter.java:31)
    at org.apache.iceberg.parquet.ParquetValueWriters$PrimitiveWriter.write(ParquetValueWriters.java:131)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:139)
    at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:65)
    ... 19 more
bryanck commented 9 months ago

This is caused by a null value in a required field. Throwing an NPE isn't ideal but that will require an update to Iceberg, I'll bring that up w/ some of the Iceberg folks.

okayhooni commented 9 months ago

This is caused by a null value in a required field. Throwing an NPE isn't ideal but that will require an update to Iceberg, I'll bring that up w/ some of the Iceberg folks.

Thank you!!!!

okayhooni commented 9 months ago

I bypassed this issue by creating table beforehand, with STRING field instead of STRUCT field(w/ NOT NULL condition).

THANK YOU @bryanck !!

okayhooni commented 9 months ago

@bryanck

How about adding behavior.on.null.values option on this iceberg connector, same as other major sink connectors like S3SinkConnector or ElasticSearchSinkConnector..?

I will really appreciate if that option added..!

bryanck commented 9 months ago

Thanks for the links, I’ll take a look. One thing that could be a problem with combining Avro schemas is that if a field is marked as required in one schema, then it will be added to the table as required, but only one type of message might have it. We may want an option to always create fields as optional for these cases.

bryanck commented 9 months ago

I opened this PR to add an option to force all columns to be optional during autocreate, which should help your case where you are combining schemas into a single table.

okayhooni commented 8 months ago

I opened this PR to add an option to force all columns to be optional during autocreate, which should help your case where you are combining schemas into a single table.

Thank you very much..! :)