jcustenborder / kafka-connect-json-schema

Apache License 2.0
14 stars 14 forks source link

Using different data type for one particular field in the schema #20

Open shaktipanda opened 1 year ago

shaktipanda commented 1 year ago

I developed a platform event source connector where I am using fromjson transformation.I am trying to update one field named value inside characteristic array (by using any instead of number or string) in such a way that it will take both string and integer values as mentioned below in the configuration

{ "name": "SFI-updateCase-Dunning-B2B-Source-dev", "config": { "connector.class": "io.confluentalesforce.SalesforcePlatformEventSourceConnector", "tasks.max": "1", "transforms": "ExtractField,fromJson", "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field":"Messagec", "transforms.fromJson.type" : "com.github.jcustenborder.kafka.connect.json.FromJson$Value", "transforms.fromJson.json.schema.location" : "Inline", "transforms.fromJson.json.schema.inline" : "{\"type\":\"object\",\"required\":[],\"properties\":{\"eventId\":{\"type\":\"string\"},\"eventTime\":{\"type\":\"string\"},\"correlationId\":{\"type\":\"string\"},\"domain\":{\"type\":\"string\"},\"eventType\":{\"type\":\"string\"},\"title\":{\"type\":\"string\"},\"description\":{\"type\":\"string\"},\"attype\":{\"type\":\"string\"},\"event\":{\"type\":\"object\",\"required\":[],\"properties\":{\"troubleTicket\":{\"type\":\"object\",\"required\":[],\"properties\":{\"id\":{\"type\":\"string\"},\"creationDate\":{\"type\":\"string\"},\"href\":{\"type\":\"string\"},\"name\":{\"type\":\"string\"},\"ticketType\":{\"type\":\"string\"},\"description\":{\"type\":\"string\"},\"externalId\":{\"type\":\"string\"},\"severity\":{\"type\":\"string\"},\"status\":{\"type\":\"string\"},\"statusChangeDate\":{\"type\":\"string\"},\"relatedParty\":{\"type\":\"array\",\"items\":{\"type\":\"object\",\"required\":[],\"properties\":{\"atreferredType\":{\"type\":\"string\"},\"id\":{\"type\":\"string\"}}}},\"characteristic\":{\"type\":\"array\",\"items\":{\"type\":\"object\",\"required\":[],\"properties\":{\"value\":{\"type\":\"any\"},\"name\":{\"type\":\"string\"}}}}}}}}}}", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "salesforce.instance": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_URL}", "salesforce.username": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_USERNAME}", "salesforce.password": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_PASSWORD}", "salesforce.password.token": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_PASSWORD_TOKEN}", "salesforce.consumer.key": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_CONSUMER_KEY}", "salesforce.consumer.secret": "${file:/mnt/secrets/connector-creds/sfdc_dev4.properties:SALESFORCE_CONSUMER_SECRET}", "http.proxy": "10.169.127.8:8080", "kafka.topic": "tef.de.UpdateCase-B2B-v1", "kafka.topic.lowercase": "false", "salesforce.initial.start": "all", "salesforce.platform.event.name": "Update_Case_to_Dunninge", "value.converter.schema.registry.ssl.key.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "value.converter.schema.registry.ssl.keystore.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "key.converter.schema.registry.ssl.keystore.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "key.converter.schema.registry.ssl.truststore.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "value.converter.schema.registry.url": "${file:/mnt/secrets/connector-creds/sfdc.properties:EMP_SCHEMA_REGISTRY_URL}", "key.converter.schema.registry.ssl.key.password":"${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "key.converter.schema.registry.ssl.keystore.location": "/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SSL_PASSWORD}", "key.converter.schema.registry.ssl.truststore.location": "/mnt/sslcerts/truststore.p12", "value.converter.schema.registry.ssl.keystore.location": "/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.location": "/mnt/sslcerts/truststore.p12", "key.converter.schema.registry.url": "${file:/mnt/secrets/connector-creds/sfdc.properties:EMP_SCHEMA_REGISTRY_URL}" } }

But while doing so I am getting the following error

Caused by: org.everit.json.schema.SchemaException: #/properties/event/properties/troubleTicket/properties/characteristic/items/properties/value: unknown type: [any]\n\tat org.everit.json.schema.loader.TypeBasedSchemaExtractor.loadForExplicitType(SchemaExtractor.java:279)\n\tat org.everit.json.schema.loader.JsonValue$Multiplexer.requireAny(JsonValue.java:46)\n\tat org.everit.json.schema.loader.TypeBasedSchemaExtractor.extract(SchemaExtractor.java:247)\n\tat org.everit.json.schema.loader.AbstractSchemaExtractor.extract(SchemaExtractor.java:113)\n\tat org.everit.json.schema.loader.SchemaLoader.runSchemaExtractors(SchemaLoader.java:424)\n\tat

Can u please suggest us how we can manage this transformation that it will handle both integer and string values

Also sharing the payload with you for reference

{"eventId":"3b3ce99d-beda-5c69-9363-a65492a7f66a","eventTime":"2022-11-18T08:28:59","eventType":"TroubleTicketUpdateEvent","domain":"CRM","attype":"TroubleTicket","title":"Update Trouble Ticket","description":"Update Trouble Ticket","event":{"troubleTicket":{"statusChangeDate":"2022-11-18T08:28:59","creationDate":"2022-11-18T08:28:59","name":"Promise to Pay","ticketType":"Dunning","description":"Create Case","severity":"High","id":"00051199","href":"https://tefb2b--b2bdev4--vlocity-cmt.sandbox.vf.force.com/lightning/r/Case/5001l00000AC5sZAAT/view","status":"Promise to Pay","statusChangeReason":"Need more information from the customer","relatedParty":[{"atreferredType":"Billing Account","id":"A1-0000000373"}],"characteristic":[{"value":"Auto","name":"dunningstoptype"},{"value":34,"name":"numberofdays"},{"value":true,"name":"dunningstoprequested"},{"value":"00051199","name":"requestID"}]}}}

jcustenborder commented 1 year ago

@shaktipanda hmm that exception is coming from the library I am using to process the schema. The first thing I would do is try to validate the schema. Once you get beyond that, keep in mind that Connect does not understand a field that can be a integer, or string. In Avro this is called a Union. You might need to process this with something like Kafka Streams or KSQL.