databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

exception due to drop column #261

Open eshishki opened 5 months ago

eshishki commented 5 months ago

We ingest avro data from kafka produced by debezium.

Debezium dropped column for an optional column - it was backward compatible change according to schema registry. This tool had not adjusted iceberg table schema which is alright.

But during initialization of iceberg writer we only look at iceberg schema and doesn't look at message' avro schema and we fail to access not existing field in avro.

Caused by: java.lang.NullPointerException
        at org.apache.iceberg.parquet.ParquetValueWriters$CollectionWriter.elements(ParquetValueWriters.java:477)
        at org.apache.iceberg.parquet.ParquetValueWriters$CollectionWriter.elements(ParquetValueWriters.java:469)
        at org.apache.iceberg.parquet.ParquetValueWriters$RepeatedWriter.write(ParquetValueWriters.java:426)
        at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
        at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
        at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
        at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
        at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
        at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
        at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.write(BaseTaskWriter.java:146)
        at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:82)
        at io.tabular.iceberg.connect.data.UnpartitionedDeltaWriter.write(UnpartitionedDeltaWriter.java:31)
        at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:37)
        at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:69)
fqtab commented 5 months ago

That should work. Did you remember to set "iceberg.tables.evolve-schema-enabled" : "true" in your connector config to enable automatic schema evolution? If that doesn't work, please share your connector config.

eshishki commented 5 months ago

i have evolve-schema-enabled true and the schema change in question is removal of array of strings.

{
"name": "onboarding",
"type": {
    "type": "array",
    "items": [
        "null",
        "string"
    ]
}
fqtab commented 5 months ago

In your case, I'm confused.

{
"name": "onboarding",
"type": {
    "type": "array",
    "items": [
        "null",
        "string"
    ]
}

That onboarding field is not optional in your schema. The elements of the onboarding array are optional. The actual onboarding array itself is not optional. Schema registry should not have allowed you to drop a required array column.

This is what I think it should look like if the onboarding array itself was configured as optional:

{
      "name": "onboarding",
      "type": [
            "null",
            {"type":"array", "items": ["null", "string"] }
      ]
}

The correct way to do this would have been to first make the array column optional. 1 message with that schema would have caused the connector to evolve the table to make that field optional. Then you could have dropped that column without running into this issue.

Regardless, you should be able to fix your immediate problem by manually updating that field as optional in your iceberg table.

eshishki commented 5 months ago

i was as surprised as you are seeing that schema registry allowed debezium to make this change with backward policy.

i would double check this behavior separately.

gaydba commented 3 months ago

yep, in database where was alter table rename column

debezium successfully created new schema with backward compatibility

iceberg sink fails with Caused by: java.lang.NullPointerException