awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
132 stars 97 forks source link

AWSKafkaAvroConverter throws "java.lang.NullPointerException" in absence of "avroRecordType" in connector config #128

Open blacktooth opened 2 years ago

blacktooth commented 2 years ago

While working with Kafka Connect and GSR, "AWSKafkaAvroConverter" throws below exception in the absence of config values"key.converter.avroRecordType" and "value.converter.avroRecordType". Below is the detailed stack trace of error message and sample connector configuration.

[2022-01-07 20:59:09,204] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:485)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:119)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:485)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:101)
    at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:162)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:117)
    ... 17 more
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
    at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:91)
    ... 21 more
Caused by: java.lang.NullPointerException
    at com.amazonaws.services.schemaregistry.deserializers.avro.DatumReaderInstance.from(DatumReaderInstance.java:37)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:112)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:109)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=db1.sampleavro.movies
s3.region=us-east-1
s3.bucket.name=<BUCKET NAME>
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
flush.size=10
store.kafka.keys=false
store.kafka.headers=false
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.compressionType=NONE
value.converter.compressionType=NONE
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.timeToLiveMillis=3600000
value.converter.timeToLiveMillis=3600000
key.converter.cacheSize=100
value.converter.cacheSize=100
key.converter.registry.name=msk-cdc-avro-keys
value.converter.registry.name=msk-cdc-avro-values
key.converter.compatibility=NONE
value.converter.compatibility=NONE
key.converter.description=none
value.converter.description=none
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
bhurwitz-yottaa commented 4 months ago

Was there any movement on this? I'm seeing the exact same error even though I am setting value.converter.avroRecordType=SPECIFIC_RECORD

forward2you commented 3 days ago

Hey team, our team also is facing this bug. We are using kafka connect to deserialise AVRO message, with config

    "value.converter.schemas.enable": "true",
    "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
    "value.converter.region": "xxx",
    "value.converter.avroRecordType": "SPECIFIC_RECORD",
    "value.converter.schemaAutoRegistrationEnabled": "false",
    "value.converter.schemaName": "xxx",
    "value.converter.registry.name": "xxx",
    "value.converter.compatibility": "NONE",
    "value.converter.dataFormat": "AVRO",
    "value.converter.compression": "ZLIB"

And get the error

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:131)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:103)
    at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:172)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:161)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:126)
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:128)
    ... 18 more
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException: Cannot invoke "java.lang.Class.toString()" because "readerClass" is null
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2085)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4011)
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4034)
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5010)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:93)
    ... 22 more
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Class.toString()" because "readerClass" is null
    at com.amazonaws.services.schemaregistry.deserializers.avro.DatumReaderInstance.from(DatumReaderInstance.java:43)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:114)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:111)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3570)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2312)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2189)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2079)
    ... 26 more

Want to know how we can fix it