databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Can't set partition with a day(timestamp_field) partition transform. #220

Closed almir-magazord closed 8 months ago

almir-magazord commented 8 months ago

Hi! I'm adding a connector with the config below:

{
    "name": "kafka-to-s3-order",
    "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "1",
        "topics": "order",
        "iceberg.control.commit.threads": 1,
        "iceberg.table.kafka.order.id-columns": "database,order_id",
        "iceberg.table.kafka.order.partition-by": "day(order_datetime),database",        
        "iceberg.tables": "kafka.order",
        "iceberg.tables.cdc-field": "__op",        
        "iceberg.tables.upsert-mode-enabled": true,
        "iceberg.tables.auto-create-enabled": true,        
        "iceberg.tables.evolve-schema-enabled": true,                
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3a://kafka-order-sink",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",        
        "write.metadata.delete-after-commit.enabled": true,
        "write.metadata.previous-versions-max": 1,
        "write.batch-size": 1000
    }
}

After sending POST with this config, I get this message on the log:

java.lang.IllegalArgumentException: Cannot find source column: (day(order_datetime))

2024-03-22 13:02:20,393 ERROR  ||  Unable to create partition spec [day(order_datetime)], table kafka.order will be unpartitioned   [io.tabular.iceberg.connect.data.IcebergWriterFactory]

On the schema registry, I have this def for the field:

13:{
"name":"order_datetime"
"type":{
"type":"string"
"connect.version":1
"connect.name":"io.debezium.time.ZonedTimestamp"
}
}

Example: "order_datetime":"2023-10-31T02:52:46Z"

What could be wrong with this config setting?

Thanks!

almir-magazord commented 8 months ago

After some research, we found out that a timestampConverter was needed to format the field the right way:

"transforms": "timestampConverter",
"transforms.timestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestampConverter.field": "order_datetime",
"transforms.timestampConverter.target.type": "Date",
"transforms.timestampConverter.format": "yyyy-MM-dd",

It worked!! :smile:

shift-alt-del commented 8 months ago

@almir-magazord Thanks for the post, it was a great help!