RedHatInsights / expandjsonsmt

Kafka Connect SMT to expand JSON field
Apache License 2.0
17 stars 18 forks source link

Support expansion from top-level string #10

Open gunnarmorling opened 3 years ago

gunnarmorling commented 3 years ago

Hey, just came across this SMT, and it's almost what I'd need :) In my case, my entire KC record value is a string-ified JSON, e.g. this one:

"{\"payment-due\":59,\"customer-id\":456,\"order-id\":2,\"credit-card-no\":\"xxxx-yyyy-dddd-aaaa\"}"

I was hoping I could expand that by configuring "transforms.expand.sourceFields": ".", but it seems the SMT expects the KC record value to be a Struct, so this fails:

connect_1     | org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [json field expansion], found: java.lang.String
connect_1     |     at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
connect_1     |     at com.redhat.insights.expandjsonsmt.ExpandJSON.applyWithSchema(ExpandJSON.java:81)
connect_1     |     at com.redhat.insights.expandjsonsmt.ExpandJSON.apply(ExpandJSON.java:75)
connect_1     |     at com.redhat.insights.expandjsonsmt.ExpandJSON$Value.apply(ExpandJSON.java:215)
...

Would be great if expansion from a top-level String was supported too. // CC @jharting

jpavlick commented 3 years ago

You might be able to use this in conjunction with another standard SMT, HoistField. If you hoist your json to a field, you can then pipe it through this SMT without issues.

wallmarc commented 3 years ago

Thanks for that suggestion!

I was able to go from:

{
  "schema": {
    "type": "string",
    "optional": false
  },
  "payload": "{\"_id\": ...

to:

{
  "schema": {
    "type": "struct",
    ...
  },
  "payload": {
    "_id": ...

The transforms look like this:

    "transforms": "HoistField,expand,ExtractField",
    "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
    "transforms.HoistField.field": "user",
    "transforms.expand.type": "com.redhat.insights.expandjsonsmt.ExpandJSON$Value",
    "transforms.expand.sourceFields": "user",
    "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractField.field":"user"

In plain words this means first "push down" the value into an intermediate user attribute, then expanding the json-string in the user attribute, finally extracting the user attribute back to top level.

lbroudoux commented 3 years ago

Hey @gunnarmorling and all!

Did not see this issue before and I developed this SMT based on your util classes (DataConverter and SchemaParser)

Check https://github.com/redhat-france-sa/microservices-saga-blueprint/blob/main/util/expand-json-smt/src/main/java/com/redhat/outbox/kafka/ExpandJSONPayload.java

Do you think it makes sense I'll contribute it here ? Or push it directly into Debezium as this is specific need often related to Oubox pattern ? Let me know.

gunnarmorling commented 3 years ago

Hey @lbroudoux, that's great to hear! We have been tracking this requirement in the context of Debezium's outbox router via DBZ-1297. Ideally, the existing outbox event routing SMT would have built-in support for optionally expanding the (JSON) payload. A contribution for expanding the SMT along those lines would be more than welcomed.

lbroudoux commented 3 years ago

Ok, I read the DBZ-1297 but this one seems larger as it tries to tackle the schema transportation question from the application.

What I've done so far is just inferring a new schema for each and every message depending on payload content. It fits my goal of having validation-enabled JSON (or Avro) messages at the end of the chain but doesn't guarantee schema to be fully accurate regarding all the possible events having same type.

Do you think it still may be an interesting first-step for DBZ-1297?

gunnarmorling commented 3 years ago

Do you think it still may be an interesting first-step for DBZ-1297?

Yes, absolutely. What you describe makes perfect sense as the next step; we can think about propagating richer schema info from the source application later on. I'd envision the following:

This wouldn't allow us to handle advanced schema types, e.g. conveying a date via a JSON string, but that's something we can handle later on. This improvement above would be a very useful addition.

lbroudoux commented 3 years ago

I will have a look on pushing it to DBZ on tomorrow then.

gunnarmorling commented 3 years ago

Excellent, thanks a lot!

lbroudoux commented 3 years ago

Hi @gunnarmorling , just a quick question below.

the insights utils are using org.bson from MongoDB for parsing and type inference. This lib in not used on Debezium side and I was thinking replacing it with Jackson. However Jackson seems to be less precise at detecting types (float, double, int32 or int64 are all numbers when using the node type info from Jackson).

Do you think it's OK to introduce this lib? Licence seems to be APL for this lib.

lbroudoux commented 3 years ago

Answer to myself: I finally find a way to do that with Jackson 😉

lbroudoux commented 3 years ago

And it's pushed! See https://github.com/debezium/debezium/pull/2710