jcustenborder / kafka-connect-json-schema

Apache License 2.0
14 stars 14 forks source link

Can FromJson handle "additionalProperties": true? #22

Open mdommett opened 5 months ago

mdommett commented 5 months ago

Reopening https://github.com/jcustenborder/kafka-connect-json-schema/issues/12 since was never fully resolved and I am having similar issues.

As a toy example, I have an event in which one of fields is an object with ever-changing fields. For example:

data":{
"createdAt":"2024-01-29T17:49:53.388901Z"
"id":"154e1153-dd94-4138-a198-d5c4c41fa47d"
"changingField": {"value_1": true, "value_2": true}
}

I will never know what the keys of changingField will be, but i know that they will be of boolean type.

So, i create a schema:

{
  "title": "mySchema",
  "type": "object",
  "properties": {
    "data": {
      "type": "object",
      "additionalProperties": False,
      "properties": {
        "createdAt": {
          "type": "string",
          "format": "date-time",
          "description": "The date of creation"
        },
        "id": {
          "type": "string",
          "format": "uuid",
          "description": "The unique ID"
        },
        "changingField": {
          "type": "object",
          "additionalProperties": {
            "type": "boolean"
          }
        },
      }
    }
  }
}

I then have a JDBC sink connector that I would define as following:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=orders_test
transforms.FromJson.json.schema.validation.enabled=false
tasks.max=3
topics=mytopic
transforms.ExtractData.field=data
transforms=FromJson,ExtractData
transforms.FromJson.type=com.github.jcustenborder.kafka.connect.json.FromJson$Value
transforms.FromJson.json.schema.location=Inline
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
connection.user=postgres
transforms.FromJson.json.schema.inline=<<schema defined above>>
auto.create=true
connection.url=XXX
value.converter=org.apache.kafka.connect.storage.StringConverter
insert.mode=upsert
pk.mode=record_value
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms.fieldtojson.type=kafka.connect.smt.FieldToJson$Value
pk.fields=id

However, this will create an error:

null (STRUCT) type doesn't have a mapping to the SQL database column type

Since the changingField has not been given a valid Struct.

Any help on how I can use the connector for this use-case?

Thanks!

morokin commented 4 months ago

Came to this because of similar reasons.

Confirming issue, same case - field with variable number of keys of same type with schema like below: "Header": { "type": "object", "additionalProperties": { "type": "array", "items": { "type": "string" } } },

is not working as expected, I'm converting to Avro with s3 sink connector and resulting avro schema part is:

{ "name": "Header", "type": { "name": "properties.Header", "type": "record", "fields": [] } }, "additionalProperties" are not affecting anything