awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
119 stars 94 forks source link

JSON schema connect converter throwing java.lang.UnsupportedOperationException #250

Open flo-mair opened 1 year ago

flo-mair commented 1 year ago

I have a self build Kafka connector and i try to integrate it with GSR. The AVRO converter works as expected. When moving to the JSON converter Kafka connect throws a UnsupportedOperationException exception.

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
connect  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
connect  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
connect  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
connect  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
connect  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
connect  | Caused by: java.lang.UnsupportedOperationException: Data Format is not supported null
connect  |      at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerFactory.getInstance(GlueSchemaRegistryDeserializerFactory.java:72)
connect  |      at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:171)

It seems that dataFormat is not set but it is set via the connector properties to JSON:

  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "value.converter":"com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
  "value.converter.region":"us-east-1",
  "value.converter.dataFormat":"JSON",
  "value.converter.schemaAutoRegistrationEnabled":"true",
  "value.converter.schemaName":"payload-json",
  "value.converter.schemas.enable": "true",
  "value.converter.avroRecordType": "GENERIC_RECORD"

Any advice how to resolve this issue?

mohitpali commented 1 year ago

Can you please share more logs and properties for AVRO vs properties for JSON ?

The integration tests run with same properties here and are working fine.