awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
125 stars 95 forks source link

Unsupported type passed for serialization: com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.NonRecordContainer #316

Open chriline opened 8 months ago

chriline commented 8 months ago

Hi,

I'm trying to use this library together with the lenses-mqtt-source-connector (https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/sources/mqttsourceconnector/) that consumes JSON from an MQTT topic.

I want the converter to pick up the JSON (for which I have no schema), create the respective AVRO schema and to upload both the schema to Glue and the message in AVRO format to Kafka. Is this something that can be achieved with this library?

This is my config:

name=lenses-5-mqtt-source
tasks.max=1

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=true
connect.mqtt.hosts=wss://my-mqtt-broker:1910
connect.mqtt.username=MyUserName
connect.mqtt.service.quality=2
connect.mqtt.kcql=INSERT INTO TestTopic SELECT * FROM MyTopic WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonPassThroughConverter`
connect.mqtt.password=MyPassword
connect.mqtt.client.id=MyClientId
# VALUE-CONVERTER
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.dataFormat=AVRO
value.converter.schemaAutoRegistrationEnabled=true
value.converter.registry.name=my-registry
value.converter.endpoint=https://glue.eu-central-1.amazonaws.com
value.converter.region=eu-central-1
value.converter.avroRecordType=SPECIFIC_RECORD
value.converter.schemaName=my-schema-name
value.converter.schemas.enable=true
value.converter.compatibility=NONE
# KEY-CONVERTER
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

I can see the message hit AWSKafkaAvroConverter::fromConnectData(String topic, Schema schema, Object value) - that is schema is null and value is the JSON string (of type string).

2023-12-20 13:27:25,921 ERROR [task-thread-lenses-5-mqtt-source-0] [org.apache.kafka.connect.runtime.WorkerTask] [doRun:212] WorkerSourceTask{id=lenses-5-mqtt-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.connect.errors.DataException: Converting Kafka Connect data to byte[] failed due to serialization error: 
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:109)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 13 common frames omitted
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Unsupported type passed for serialization: com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.NonRecordContainer@95171f16
    at com.amazonaws.services.schemaregistry.serializers.avro.AvroSerializer.createDatumWriter(AvroSerializer.java:94)
    at com.amazonaws.services.schemaregistry.serializers.avro.AvroSerializer.serialize(AvroSerializer.java:64)
    at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.serialize(GlueSchemaRegistrySerializationFacade.java:108)
    at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:123)
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:107)
    ... 17 common frames omitted
JKCai commented 5 months ago

Hi team, is there any update on this git issue? I want to integrate lenses connector (S3 connector) with AWS Glue registry and facing similar issue.