Closed mango009 closed 1 year ago
Hi, thanks for raising this issue, however, it seems to be a more Kafka-related one rather than an issue in the source connector.
The source connector captures the row-level changes and produces Kafka messages after executing an INSERT
, UPDATE
or DELETE
operation. So, the content of the Kafka message describes the pre-image and post-image entries, and the changed columns with their values appear in nested structures before
and after
:
e.g. Kafka message after an Insert operation
{
"source": {
"version": "1.0.4-SNAPSHOT",
"connector": "scylla",
"name": "testing",
"ts_ms": 1675094304126,
"snapshot": {
"string": "false"
},
"db": "test",
"keyspace_name": "test",
"table_name": "t",
"ts_us": 1675094304126125
},
"before": null,
"after": {
"testing.test.t.After": {
"pk": {
"int": 14
},
"v": {
"testing.test.t.v.Cell": {
"value": {
"int": 15
}
}
}
}
},
"op": {
"string": "c"
},
"ts_ms": {
"long": 1675099424937
},
"transaction": null
}
Unfortunately, the Kafka single message transformations only support top-level fields for now, and very recently an improvement proposal to add support for nested structures was created. I could not find a way to apply transformations to single fields inside a nested structure, but a simple workaround I can suggest is to use the Replace
transformation to mask out the after
or before
struct entirely (the MaskField
transformation cannot be used on structs). Perhaps, you can use the Flatten
transformation to have only top-level fields and then mask out the necessary fields. The latter (if it works, cause I haven't tested it yet) seems to be a smarter workaround imho.
@avelanarius should we close this issue, now?
Hi, I have a user table with a column named ssn that holds SSN#. When the source connector publish to Kafka topic I want to mask out the SSN# field. So I use the MaskField [transform] (https://docs.confluent.io/platform/current/connect/transforms/maskfield.html#maskfield). I setup the transform with the following properties: transforms=dataMask transforms.dataMask.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.dataMask.fields=ssn transforms.dataMask.replacement=
But I'm not seeing the field being masked. I wonder maybe the field name should not be the same as the database column name? Thanks for any help.