getindata / kafka-connect-iceberg-sink

Apache License 2.0
77 stars 30 forks source link

Add partitioning column configuration setting #30

Closed johanhenriksson closed 1 year ago

johanhenriksson commented 1 year ago

Description

Adds a configuration setting that allows the user to specify which column is used for partitioning. The column must be a unix millisecond timestamp.

This implementation seems to work for my use case, what do you think? @gliter

Resolves #18

PR Checklist
gliter commented 1 year ago

Look good. Could you add some test case?

gliter commented 1 year ago

Please also update changelog and readme.

johanhenriksson commented 1 year ago

Added a simple test to ensure IcebergChangeEvent works as intended. Not sure how to properly test that the actual configuration setting is used, though. Also updated readme & changelog

johanhenriksson commented 1 year ago

I will double check that everything works the way i want it tomorrow, don't merge until I mark it as ready :)

johanhenriksson commented 1 year ago

It bothers me a bit that the resulting column name (__source_ts) is not configurable. Perhaps there should be two settings?

And it would be even better to be able to pass in a timestamptz column directly and avoid the entire conversion thing. Maybe something like if iceberg.partition.timestamp is set, attempt to convert it to a timestamp and store in iceberg.partition.column. Otherwise use iceberg.partition.column directly

gliter commented 1 year ago

Yep. That would be definitely an improvement.

johanhenriksson commented 1 year ago

Hm, the connector suddenly stopped working for me. It seems im always getting this error no matter what I do, even trying with the older official versions.

Error: Table was created concurrently: raw.new-test (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-kafka-iceberg-sink-0]
org.apache.iceberg.exceptions.AlreadyExistsException: Table was created concurrently: raw.new-test
        at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:176)
        at com.getindata.kafka.connect.iceberg.sink.IcebergUtil.createIcebergTable(IcebergUtil.java:54)
        at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:63)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:59)
        at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:50)
        at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Is this something you've seen before?

Nothing is written to S3, and I tried nuking the entire Nessie database just in case.

gliter commented 1 year ago

Not really. Maybe you could try to put some logging/breakpoint here: https://github.com/getindata/kafka-connect-iceberg-sink/blob/develop/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergUtil.java#L37 to see when tables are created

johanhenriksson commented 1 year ago

Unfortunately I dont really have a local environment where I can easily debug :/ Weird thing is, there is already logging around the table creation, so I know it's not actually happening concurrently.

gliter commented 1 year ago

Oh, i thought it happens during some automated tests. There is a system test with test containers, you can fiddle with it to check if you can replicate it. https://github.com/getindata/kafka-connect-iceberg-sink/blob/develop/src/test/java/com/getindata/kafka/connect/iceberg/sink/IcebergSinkSystemTest.java

johanhenriksson commented 1 year ago

The problems were unrelated to the connector.

The latest commit introduces two settings:

If iceberg.partition.timestamp is empty, iceberg.partition.column is expected to already exist and be of the correct type. Not sure if this could actually happen with the current setup?

If they are set to the same field, that field is expected to be an integer timestamp, and its values will be converted to timestamptz and stored in a column with the same name, overwriting the integers.

Sounds good?