scylladb / scylla-cdc-source-connector

A Kafka source connector capturing Scylla CDC changes
Apache License 2.0
41 stars 17 forks source link

before field is null in Debezium format message #31

Open newsbreak-tonglin opened 1 year ago

newsbreak-tonglin commented 1 year ago

This is an example of update operation Debezium format message when I try to use scylla-cdc-source-connector to fetch cdc data from scylla, but before field is null and it can't work together with Flink DebeziumJson

{ "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"version" }, { "type":"string", "optional":false, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"ts_ms" }, { "type":"string", "optional":true, "name":"io.debezium.data.Enum", "version":1, "parameters":{ "allowed":"true,last,false" }, "default":"false", "field":"snapshot" }, { "type":"string", "optional":false, "field":"db" }, { "type":"string", "optional":false, "field":"keyspace_name" }, { "type":"string", "optional":false, "field":"table_name" }, { "type":"int64", "optional":false, "field":"ts_us" } ], "optional":false, "name":"com.scylladb.cdc.debezium.connector", "field":"source" }, { "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"value" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.a.Cell", "field":"a" }, { "type":"int32", "optional":true, "field":"pk" }, { "type":"struct", "fields":[ { "type":"int32", "optional":true, "field":"value" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.v.Cell", "field":"v" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.Before", "field":"before" }, { "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"value" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.a.Cell", "field":"a" }, { "type":"int32", "optional":true, "field":"pk" }, { "type":"struct", "fields":[ { "type":"int32", "optional":true, "field":"value" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.v.Cell", "field":"v" } ], "optional":true, "name":"MyScyllaCluster.dev.test_cdc.After", "field":"after" }, { "type":"string", "optional":true, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"id" }, { "type":"int64", "optional":false, "field":"total_order" }, { "type":"int64", "optional":false, "field":"data_collection_order" } ], "optional":true, "field":"transaction" } ], "optional":false, "name":"MyScyllaCluster.dev.test_cdc.Envelope" }, "payload":{ "source":{ "version":"1.0.1", "connector":"scylla", "name":"MyScyllaCluster", "ts_ms":1676889174620, "snapshot":"false", "db":"dev", "keyspace_name":"dev", "table_name":"test_cdc", "ts_us":1676889174620301 }, "before":null, "after":{ "a":null, "pk":1, "v":{ "value":2222222 } }, "op":"u", "ts_ms":1676889216086, "transaction":null } }

https://github.com/apache/flink/blob/release-1.15.2/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java#L146