tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Upsert on structs #133

Closed haripriyarhp closed 8 months ago

haripriyarhp commented 9 months ago

Hi again, Another question, does the upsert work on columns inside a struct column? For example, if the column to be upserted is present inside a struct in the table, does it still work? Thanks, Haripriya

bryanck commented 9 months ago

Yes, upsets work by replacing the entire row, the source record should contain all values.

haripriyarhp commented 9 months ago

Hi, thanks for the response. I tried UPSERT using this connector. I get the below error. You have mentioned source record should contain all values, does that mean there should be no nulls? org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: topic_name, partition, 0, offset: 0 at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:74) at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201) at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204) at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:119) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0 at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:149) at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:188) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48) at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41) at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:588) at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135) at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)

bryanck commented 9 months ago

Apologies, maybe I didn't word that well. The row will be deleted and then a new row inserted with the values from the Kafka record, no values will be carried over from the deleted row. You can have nulls if the table fields are optional. When using upserts the ID field(s) must be present in the record.

haripriyarhp commented 9 months ago

okay, thanks! But I get the below error when ever I try UPSERT. I assumed it was because of structs but even after I removed these struct and map columns, I get the error

Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: topic_name , partition, 0, offset: 0
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:74)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:201)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:199)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:188)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:175)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    ... 11 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
    at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:149)
    at io.tabular.iceberg.connect.data.RecordProjection.get(RecordProjection.java:188)
    at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48)
    at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:588)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
    at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:388)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:371)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.deleteKey(BaseTaskWriter.java:186)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:77)
    at io.tabular.iceberg.connect.data.UnpartitionedDeltaWriter.write(UnpartitionedDeltaWriter.java:30)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:36)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:66)
    ... 19 more
bryanck commented 9 months ago

If you can give me a way to reproduce I can investigate further. To me it looks like missing equality fields in your data.

xq2005 commented 9 months ago

@haripriyarhp this issue is same with mine. you can refer issue #128 and try.

bryanck commented 9 months ago

I found an issue when the identity columns aren't set on a table, unrelated to struct columns. I'll have a PR to fix this soon.

bryanck commented 9 months ago

This PR should fix this issue, thanks for reporting this.

haripriyarhp commented 8 months ago

@bryanck Thanks for the update. It solved the above error but I got a new one. So I tested the latest version in the below combinations.

Partition Enabled Auto create table Upsert enabled Table in Athena
no yes yes No data retrieved
no yes no Table created with all columns visible
yes yes no Table created with all columns visible
yes yes yes No data retrieved

If there are only INSERTS performed then all works great. The table is created and also data is displayed when querying Athena table If UPSERT is enabled then I get the below warning. No data is displayed when Athena table is queried even though some data is written in S3. 2023-10-30 22:06:10,809 WARN [kafka-connector-tabular-iceberg-12345|task-0] Commit failed, will try again next cycle (io.tabular.iceberg.connect.channel.Coordinator) [iceberg-coord] java.lang.IllegalArgumentException: Cannot write delete files in a v1 table at org.apache.iceberg.ManifestFiles.writeDeleteManifest(ManifestFiles.java:202) at org.apache.iceberg.SnapshotProducer.newDeleteManifestWriter(SnapshotProducer.java:501) at org.apache.iceberg.MergingSnapshotProducer.lambda$newDeleteFilesAsManifests$18(MergingSnapshotProducer.java:1020) at java.base/java.util.HashMap.forEach(HashMap.java:1421) at org.apache.iceberg.MergingSnapshotProducer.newDeleteFilesAsManifests(MergingSnapshotProducer.java:1016) at org.apache.iceberg.MergingSnapshotProducer.prepareDeleteManifests(MergingSnapshotProducer.java:1002) at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:873) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:220) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:370) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:368) at io.tabular.iceberg.connect.channel.Coordinator.commitToTable(Coordinator.java:237) at io.tabular.iceberg.connect.channel.Coordinator.lambda$doCommit$1(Coordinator.java:147) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)

bryanck commented 8 months ago

You'll need to set a property to auto create the table as a V2 table, set iceberg.tables.auto-create-props.format-version to 2. When the sink is updated to use Iceberg 1.4.x, V2 will become the default so you won't need to set that.

bryanck commented 8 months ago

Sink version 0.6.0 includes Iceberg 1.4.2 which uses format V2 by default.