getindata / kafka-connect-iceberg-sink

Apache License 2.0
76 stars 27 forks source link

SinkRecord getting wrong message #38

Open clazalde opened 1 year ago

clazalde commented 1 year ago

Hello Im using this connector using AWS MSK Im using a postgre Debezium, when I read the topic Im getting this message: { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" } ], "optional": false, "name": "debezium.public.testtable2.Key" }, "payload": { "id": 2 } } { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" }, { "type": "string", "optional": true, "field": "op" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "source_ts_ms" }, { "type": "string", "optional": true, "field": "db" } ], "optional": false, "name": "debezium.public.testtable2.Value" }, "payload": { "id": 2, "first_name": "chris", "last_name": "lazalde", "email": "lazalde", "op": "c", "table": "testtable2", "lsn": 706741912, "source_ts_ms": 1685330874879, "db": "sqa-sdiv-db-sbx" } } But in the getindata iceberg connector the record is like this:

SinkRecord{kafkaOffset=0, timestampType=CreateTime } ConnectRecord{topic='debezium.public.testtable2', kafkaPartition=0, key={ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" } ], "optional": false, "name": "debezium.public.testtable2.Key" }, "payload": { "id": 2 } }, keySchema=Schema{STRING }, value={ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" }, { "type": "string", "optional": true, "field": "op" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "source_ts_ms" }, { "type": "string", "optional": true, "field": "db" } ], "optional": false, "name": "debezium.public.testtable2.Value" }, "payload": { "id": 2, "first_name": "chris", "last_name": "lazalde", "email": "lazalde", "op": "c", "table": "testtable2", "lsn": 706741912, "source_ts_ms": 1685330874879, "db": "sqa-sdiv-db-sbx" } }, valueSchema=Schema{STRING }, timestamp=1685330876020, headers=ConnectHeaders(headers=) }

Any clues??

gliter commented 1 year ago

I am sorry, I fail to see the difference, beside the structure.

clazalde commented 1 year ago

@gliter If you take a closer look, you would find that the KeySchema and ValueSchema from the sink Record is : Schema{STRING} and the Key and Value have the hole Value and Schema data image

So I keep getting the error message "Schema has no fields"