databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Upsert on structs #133

Closed haripriyarhp closed 1 year ago

haripriyarhp commented 1 year ago

Hi again, Another question, does the upsert work on columns inside a struct column? For example, if the column to be upserted is present inside a struct in the table, does it still work? Thanks, Haripriya

bryanck commented 1 year ago

Yes, upsets work by replacing the entire row, the source record should contain all values.

haripriyarhp commented 1 year ago

Hi, thanks for the response. I tried UPSERT using this connector. I get the below error. You have mentioned source record should contain all values, does that mean there should be no nulls? org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: topic_name, partition, 0, offset: 0 at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:74) at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201) at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204) at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:119) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0 at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:149) at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:188) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41) at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:588) at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135) at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)

bryanck commented 1 year ago

Apologies, maybe I didn't word that well. The row will be deleted and then a new row inserted with the values from the Kafka record, no values will be carried over from the deleted row. You can have nulls if the table fields are optional. When using upserts the ID field(s) must be present in the record.

haripriyarhp commented 1 year ago

okay, thanks! But I get the below error when ever I try UPSERT. I assumed it was because of structs but even after I removed these struct and map columns, I get the error

Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: topic_name , partition, 0, offset: 0
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:74)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    ... 11 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
    at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:149)
    at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:188)
    at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48)
    at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:588)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
    at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:388)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:371)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.deleteKey(BaseTaskWriter.java:186)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:77)
    at io.tabular.iceberg.connect.data.UnpartitionedDeltaWriter.write(UnpartitionedDeltaWriter.java:30)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:36)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:66)
    ... 19 more
bryanck commented 1 year ago

If you can give me a way to reproduce I can investigate further. To me it looks like missing equality fields in your data.

xq2005 commented 1 year ago

@haripriyarhp this issue is same with mine. you can refer issue #128 and try.

bryanck commented 1 year ago

I found an issue when the identity columns aren't set on a table, unrelated to struct columns. I'll have a PR to fix this soon.

bryanck commented 1 year ago

This PR should fix this issue, thanks for reporting this.

haripriyarhp commented 1 year ago

@bryanck Thanks for the update. It solved the above error but I got a new one. So I tested the latest version in the below combinations.

Partition Enabled Auto create table Upsert enabled Table in Athena
no yes yes No data retrieved
no yes no Table created with all columns visible
yes yes no Table created with all columns visible
yes yes yes No data retrieved

If there are only INSERTS performed then all works great. The table is created and also data is displayed when querying Athena table If UPSERT is enabled then I get the below warning. No data is displayed when Athena table is queried even though some data is written in S3. 2023-10-30 22:06:10,809 WARN [kafka-connector-tabular-iceberg-12345|task-0] Commit failed, will try again next cycle (io.tabular.iceberg.connect.channel.Coordinator) [iceberg-coord] java.lang.IllegalArgumentException: Cannot write delete files in a v1 table at org.apache.iceberg.ManifestFiles.writeDeleteManifest(ManifestFiles.java:202) at org.apache.iceberg.SnapshotProducer.newDeleteManifestWriter(SnapshotProducer.java:501) at org.apache.iceberg.MergingSnapshotProducer.lambda$newDeleteFilesAsManifests$18(MergingSnapshotProducer.java:1020) at java.base/java.util.HashMap.forEach(HashMap.java:1421) at org.apache.iceberg.MergingSnapshotProducer.newDeleteFilesAsManifests(MergingSnapshotProducer.java:1016) at org.apache.iceberg.MergingSnapshotProducer.prepareDeleteManifests(MergingSnapshotProducer.java:1002) at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:873) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:220) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:370) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:368) at io.tabular.iceberg.connect.channel.Coordinator.commitToTable(Coordinator.java:237) at io.tabular.iceberg.connect.channel.Coordinator.lambda$doCommit$1(Coordinator.java:147) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)

bryanck commented 1 year ago

You'll need to set a property to auto create the table as a V2 table, set iceberg.tables.auto-create-props.format-version to 2. When the sink is updated to use Iceberg 1.4.x, V2 will become the default so you won't need to set that.

bryanck commented 1 year ago

Sink version 0.6.0 includes Iceberg 1.4.2 which uses format V2 by default.