tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

autoCreateTable does not use SinkRecord.keySchema to build org.apache.iceberg.Schema and meet exception when parse deleting opertion record #128

Closed xq2005 closed 9 months ago

xq2005 commented 9 months ago

In function autoCreateTable, org.apache.iceberg.Schema does not supply the identifierFieldIds.

When parsing delete operation record, meet the following exception:

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:631) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: linux.pg_kafka.sourcedb.product, partition, 0, offset: 240 at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73) at io.tabular.iceberg.connect.channel.Worker.routeRecordDynamically(Worker.java:231) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:186) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:174) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:145) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 10 more Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0 at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:149) at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:188) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41) at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:588) at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:139) at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67) at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:388) at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:371) at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277) at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.deleteKey(BaseTaskWriter.java:186) at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:77) at io.tabular.iceberg.connect.data.UnpartitionedDeltaWriter.write(UnpartitionedDeltaWriter.java:30) at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:36) at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:68) ... 16 more

bryanck commented 9 months ago

Can you post the record value causing this?

xq2005 commented 9 months ago

The fix is very directly, if key_schemais contained in SinkRecord, scaning the valueSchema's structType to find the mapping id and build the identifierFieldIds. if iceberg.table.<table name>.id-columns is defined in the configuration file, scaning the valueSchema's structType to find the mapping id and build the identifierFieldIds. else use null.

    Set<Integer> equalityFieldIds = ImmutableSet.of();
    if (sample.keySchema() != null) {
      equalityFieldIds =
          sample.keySchema().fields().stream()
              .map(f -> structType.field(f.name()).fieldId())
              .collect(toSet());
    } 
    List<String> idCols = config.tableConfig(tableName).idColumns();
    if (!idCols.isEmpty()) {
      equalityFieldIds =
        idCols.stream()
            .map(colName -> structType.field(colName).fieldId())
            .collect(toSet());
    }

    org.apache.iceberg.Schema schema =
        new org.apache.iceberg.Schema(structType.fields(), equalityFieldIds);

iceberg.table.\<table name>.id-columns handle code in createTableWriter can be deleted.

bryanck commented 9 months ago

The sink relies on the record value and makes no assumptions about the structure of the record key. You can use an SMT to copy the key to the value if you want.

xq2005 commented 9 months ago

Can you post the record value causing this?

I added one line to print the record content.

SinkRecord{kafkaOffset=240, timestampType=CreateTime} ConnectRecord{topic='sourcedb.cdc.product', kafkaPartition=0, key=Struct{productid=1122}, keySchema=Schema{key.SOURCEDB.CDC.PRODUCT:STRUCT}, value=Struct{productid=1122,descriptn=kafka 3.5.1,location=techzone1,status=I,unitprice=2.00,unitcost=600.00,qtyonhand=2425,qtyalloc=50,qtyminord=2000,_cdc_op=D,_cdc_ts=Sat Oct 14 18:15:11 GMT 2023,_cdc_table=sourcedb.cdc.product,_cdc_key=Struct{productid=1122}}, valueSchema=Schema{value.SOURCEDB.CDC.PRODUCT:STRUCT}, timestamp=1697300112168, headers=ConnectHeaders(headers=)} (io.tabular.iceberg.connect.channel.Worker:180)
xq2005 commented 9 months ago

The sink relies on the record value and makes no assumptions about the structure of the record key. You can use an SMT to copy the key to the value if you want.

yes, SMT is a good idea, and I need to write a complex transformer.

bryanck commented 9 months ago

The Debezium SMT in this repo does this, copies the key to a field in the value. It isn't very complex.

xq2005 commented 9 months ago

OK. I can have a try.