getindata / kafka-connect-iceberg-sink

Apache License 2.0
77 stars 30 forks source link

Setting partition field using property in topic's value part #18

Closed rjdp closed 1 year ago

rjdp commented 1 year ago

How can we use partition timestamp field other than message ingest time, I want to use mongo object Id , which I am able to convert using a custom udf and create a stream on it.

suppose this is my stream backed by topic "k1"

select   FORMAT_TIMESTAMP(FROM_UNIXTIME(objectidtoepoch(EXTRACTJSONFIELD(payload, '$.nestes.fullDocument._id["$oid"]'))), 'yyyy-MM-dd HH:mm:ss.SSS') as ts, EXTRACTJSONFIELD(payload, '$.nestes.fullDocument.first_name') as first_name from data_compliance_stream;

I would like to use the ts field for partitioning

gliter commented 1 year ago

This connector consumes Debezium events, they have a transaction timestamp that is used by this connector for partitioning key, with current implementation there is no possiblity to configure which field is used for partitioning but you could write Kafka Connect transformation to put value of that field in the ts field.

https://github.com/getindata/kafka-connect-iceberg-sink#iceberg-partitioning-support

johanhenriksson commented 1 year ago

I'm also looking to use this connector without Debezium, and having a configuration option for setting the partitioning field would be great.

gliter commented 1 year ago

@johanhenriksson I dont recently have time to do some implementation here but I would be happy to do the review.

johanhenriksson commented 1 year ago

Cool, I might give it a shot. My java is super rusty though. Just wanted to let you know that the feature would be appreciated :)

Trying to work around it for now but it doesn't seem to work. From the readme it seems events should have a timestamp on this format

{
  "sourceOffset": {
    "ts_ms": 123,
  },
  // other fields...
}

But looking at the source, it looks like it should be

{
  "__source_ts_ms": 123,
  // other fields...
}

or

{
  "__source_ts": 123,
  // other fields...
}

Could you clarify where to put the timestamp? :)

gliter commented 1 year ago

@johanhenriksson It should be in the first format you presented, the reason why in source it looks like that it is that before Sink there is an unwrap transformation:

transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields: op,table,source.ts_ms,db
gliter commented 1 year ago

To be more specific it should be

{
  "source" : {
    "ts_ms": 123
  }
}
johanhenriksson commented 1 year ago

Thanks for the quick reply! It still doesn't seem to work however. I'm not sure if it could be because im changing an already existing table?

The transform is defined on the debezium source, and I'm not using that. Is it still the correct format? Im trying to write events directly to a topic and have this connector write them to iceberg.

gliter commented 1 year ago

@johanhenriksson do you create full debezium event with schema and so on? Please take a look at this blog post https://getindata.com/blog/real-time-ingestion-iceberg-kafka-connect-apache-iceberg-sink/ in second part there is an example on creating debezium events with Python code.