jcustenborder / kafka-connect-json-schema

Apache License 2.0
14 stars 14 forks source link

Connector failing while using @ in the schema #19

Open shaktipanda opened 2 years ago

shaktipanda commented 2 years ago

Hi,

I am developing a sfdcplatformevent source connector where the configuration is

{ "name": "SFDC_scenariomatrix_billing_source", "config": { "connector.class": "io.confluent.salesforce.SalesforcePlatformEventSourceConnector", "tasks.max": "1", "kafka.topic":"tef.de.scenariomatrix-B2B-v1", "kafka.topic.lowercase":"false", "transforms": "ExtractField,fromJson", "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field":"TextAreac", "transforms.fromJson.type" : "com.github.jcustenborder.kafka.connect.json.FromJson$Value", "transforms.fromJson.json.schema.location" : "Inline", "transforms.fromJson.json.schema.inline" : "{\"type\":\"object\",\"properties\":{\"eventId\":{\"type\":\"string\"},\"eventTime\":{\"type\":\"string\"},\"eventType\":{\"type\":\"string\"},\"domain\":{\"type\":\"string\"},\"@type\":{\"type\":\"string\"},\"@baseType\":{\"type\":\"string\"},\"event\":{\"type\":\"object\",\"properties\":{\"scenarioMatrix\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"href\":{\"type\":\"string\"},\"@type\":{\"type\":\"string\"}}}}}}}", "value.converter":"io.confluent.connect.avro.AvroConverter", "key.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.ssl.keystore.password":"mystorepassword", "key.converter.schema.registry.ssl.keystore.password":"mystorepassword", "key.converter.schema.registry.ssl.truststore.password":"mystorepassword", "value.converter.schema.registry.url":"https://schemaregistry.ict.svc.cluster.local:8081", "key.converter.schema.registry.ssl.keystore.location":"/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.password":"mystorepassword", "key.converter.schema.registry.ssl.truststore.location":"/mnt/sslcerts/truststore.p12", "value.converter.schema.registry.ssl.keystore.location":"/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.location":"/mnt/sslcerts/truststore.p12", "key.converter.schema.registry.url":"https://schemaregistry.ict.svc.cluster.local:8081", "salesforce.instance": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_URL}", "salesforce.username": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_USERNAME}", "salesforce.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_PASSWORD}", "salesforce.password.token": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_PASSWORD_TOKEN}", "http.proxy": "10.169.127.8:8080", "salesforce.consumer.key": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_CONSUMER_KEY}", "salesforce.consumer.secret": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_CONSUMER_SECRET}", "salesforce.initial.start": "all", "salesforce.platform.event.name": "TEF_ScenarioMatrixe" } }

when schema in the above configuration is used with @ we get the following error

nCaused by: org.apache.avro.SchemaParseException: Illegal initial character: @baseType\n\tat org.apache.avro.Schema.validateName(Schema.java:1567)\n\tat org.apache.avro.Schema.access$400(Schema.java:92)\n\tat org.apache.avro.Schema$Field.(Schema.java:549)\n\tat org.apache.avro.Schema$Field.(Schema.java:588)\n\tat io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1175)\n\tat io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:952)\n\tat io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:760)\n\tat io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:745)\n\tat io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:85)\n\tat org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$4(WorkerSourceTask.java:333)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\t... 11 more\n"}]

But when we use the below configuratin where there is no @ in the schema

{ "name": "SFDC_scenariomatrix_billing_source", "config": { "connector.class": "io.confluent.salesforce.SalesforcePlatformEventSourceConnector", "tasks.max": "1", "kafka.topic":"tef.de.scenariomatrix-B2B-v1", "kafka.topic.lowercase":"false", "transforms": "ExtractField,fromJson", "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field":"TextAreac", "transforms.fromJson.type" : "com.github.jcustenborder.kafka.connect.json.FromJson$Value", "transforms.fromJson.json.schema.location" : "Inline", "transforms.fromJson.json.schema.inline" : "{\"type\":\"object\",\"properties\":{\"eventId\":{\"type\":\"string\"},\"eventTime\":{\"type\":\"string\"},\"eventType\":{\"type\":\"string\"},\"domain\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"},\"baseType\":{\"type\":\"string\"},\"event\":{\"type\":\"object\",\"properties\":{\"scenarioMatrix\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"href\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"}}}}}}}", "value.converter":"io.confluent.connect.avro.AvroConverter", "key.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.ssl.keystore.password":"mystorepassword", "key.converter.schema.registry.ssl.keystore.password":"mystorepassword", "key.converter.schema.registry.ssl.truststore.password":"mystorepassword", "value.converter.schema.registry.url":"https://schemaregistry.ict.svc.cluster.local:8081", "key.converter.schema.registry.ssl.keystore.location":"/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.password":"mystorepassword", "key.converter.schema.registry.ssl.truststore.location":"/mnt/sslcerts/truststore.p12", "value.converter.schema.registry.ssl.keystore.location":"/mnt/sslcerts/keystore.p12", "value.converter.schema.registry.ssl.truststore.location":"/mnt/sslcerts/truststore.p12", "key.converter.schema.registry.url":"https://schemaregistry.ict.svc.cluster.local:8081", "salesforce.instance": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_URL}", "salesforce.username": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_USERNAME}", "salesforce.password": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_PASSWORD}", "salesforce.password.token": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_PASSWORD_TOKEN}", "http.proxy": "10.169.127.8:8080", "salesforce.consumer.key": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_CONSUMER_KEY}", "salesforce.consumer.secret": "${file:/mnt/secrets/connector-creds/sfdc.properties:SALESFORCE_CONSUMER_SECRET}", "salesforce.initial.start": "all", "salesforce.platform.event.name": "TEF_ScenarioMatrixe" } }

the connector runs fine without giving any error .

So can u please tell us that how can we resolve this issue of @ and if @ will be supported by kafka connector or not