Open marlonpatrick opened 3 years ago
@marlonpatrick It should. Do you have an example schema and data that we can use as a test case?
I am using MongoDB Connector Source, my connector configuration is as follows:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
change.stream.full.document=updateLookup
connection.uri=mongodb://mongodb:27017
database=my-database
pipeline=[{"$match": { "operationType": { "$in": [ "insert", "replace", "update" ] } }}]
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://kafka-schema-registry:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://kafka-schema-registry:8081
transforms=fromJsonKey,fromJsonValue
transforms.fromJsonKey.type=com.github.jcustenborder.kafka.connect.json.FromJson$Key
transforms.fromJsonValue.type=com.github.jcustenborder.kafka.connect.json.FromJson$Value
transforms.fromJsonKey.json.schema.location=Inline
transforms.fromJsonValue.json.schema.location=Inline
transforms.fromJsonKey.json.schema.inline={ "$id": "http://example.com/example.json", "$schema": "http://json-schema.org/draft-07/schema", "type": "object", "required": [ "_id" ], "properties": { "_id": { "$id": "#/properties/_id", "type": "object", "required": [ "_data" ], "properties": { "_data": { "$id": "#/properties/_id/properties/_data", "type": "string", } }, "additionalProperties": true } }, "additionalProperties": true }
transforms.fromJsonValue.json.schema.inline={ "$schema":"http://json-schema.org/draft-07/schema", "$id":"http://example.com/example.json", "type":"object", "required":[ "_id", "operationType", "clusterTime", "fullDocument", "ns", "documentKey" ], "properties":{ "_id":{ "$id":"#/properties/_id", "type":"object", "required":[ "_data" ], "properties":{ "_data":{ "$id":"#/properties/_id/properties/_data", "type":"string" } }, "additionalProperties":true }, "operationType":{ "$id":"#/properties/operationType", "type":"string" }, "clusterTime":{ "$id":"#/properties/clusterTime", "type":"object", "required":[ "$timestamp" ], "properties":{ "$timestamp":{ "$id":"#/properties/clusterTime/properties/%24timestamp", "type":"object", "required":[ "t", "i" ], "properties":{ "t":{ "$id":"#/properties/clusterTime/properties/%24timestamp/properties/t", "type":"integer" }, "i":{ "$id":"#/properties/clusterTime/properties/%24timestamp/properties/i", "type":"integer" } }, "additionalProperties":true } }, "additionalProperties":true }, "fullDocument":{ "$id":"#/properties/fullDocument", "type":"object", "additionalProperties":true }, "ns":{ "$id":"#/properties/ns", "type":"object", "required":[ "db", "coll" ], "properties":{ "db":{ "$id":"#/properties/ns/properties/db", "type":"string" }, "coll":{ "$id":"#/properties/ns/properties/coll", "type":"string" } }, "additionalProperties":true }, }, "additionalProperties":true }
The MongoDB Connector generate messages with StringConverter, here is an example of original message:
{"_id": {"_data": "825F52D0F3000000012B022C0100296E5A1004F27C4F9C01A246ABA5897A4D40F9D22B46645F696400645F52CF6985CC6000C49F6DDE0004"}, "operationType": "replace", "clusterTime": {"$timestamp": {"t": 1599262963, "i": 1}}, "fullDocument": {"_id": {"$oid": "5f52cf6985cc6000c49f6dde"}, "field1": "value1", "object1": {"field": 3.0}}, "ns": {"db": "rm-memories-mngmt", "coll": "fromjson-collection"}, "documentKey": {"_id": {"$oid": "5f52cf6985cc6000c49f6dde"}}}
And here is the converted message with FromJson transformation:
{"_id":{"_data":"825F52D222000000012B022C0100296E5A1004F27C4F9C01A246ABA5897A4D40F9D22B46645F696400645F52CF6985CC6000C49F6DDE0004"},"clusterTime":{"$timestamp":{"i":1,"t":1599263266}},"fullDocument":{},"ns":{"coll":"fromjson-collection","db":"rm-memories-mngmt"},"operationType":"replace"}
Pay attention to the fullDocument and documentKey fields.
My enviroment details:
mongo:4.4.0 zookeeper:3.4.9 confluentinc/cp-kafka:5.5.1 confluentinc/cp-schema-registry:5.5.1 confluentinc/cp-kafka-rest:5.5.1 confluentinc/cp-kafka-connect:5.5.1
mongo-kafka-1.2.0-all.jar jcustenborder-kafka-connect-json-schema-0.2.5
Facing same problem... Are there any backed solutions for this?
Facing the same problem, any fields that are not defined in an object
are just ignored ... I am using a JDBC sink connector and unfortunately have a field in my json event that will be dynamically changing... I would like to just set additionalProperties = {type: boolean}
in my schema, but it seems that any additional properties are just ignored and are not given to the resulting Struct. Any help appreciated for how to overcome this ?
@marlonpatrick did you manage to find a solution to the above issue?
Came to this issue because of similar reasons:
Confirming issue, same case - field with variable number of keys of same type with schema like below:
"Header": { "type": "object", "additionalProperties": { "type": "array", "items": { "type": "string" } } },
is not working as expected, I'm converting to Avro with s3 sink connector and resulting avro schema part is:
{ "name": "Header", "type": { "name": "properties.Header", "type": "record", "fields": [] } },
"additionalProperties" are not affecting anything
Hello,
I'm doing some tests with FromJson and it works well for the fields listed explicitly in the json schema (json.schema.inline). However, fields that are not listed are dropped from the message even when I put "additionalProperties": true.
My question is: this transformation can handle "additionalProperties": true properly and keep all additional fields??