confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
41 stars 857 forks source link

Deserialization Error: NullReferenceException #1426

Open indraneeldey1 opened 3 years ago

indraneeldey1 commented 3 years ago

Description

Hi there, I've been noticing that I've been getting a Null Exception error from the AvroDeserializer that I have not been able to resolve, though data comes though. I'm following the AvroSecific example found at https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific

It followed the base tutorial of working with Docker: https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

Though when consuming as a Generic record, no problems are found. I have found similar issues, but not one like this specifically from what errors I've been able to find.

How to reproduce

Follow example of AvroSpecific as stated before, using the AvroDeserializer

Checklist

Please provide the following information:

Basics

Found Errors

The initial error I was able to find that I caught inside for my own exception was the following:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
   at Kafka.Consumer.Program.Main(String[] args) in C:\Users\idey\RiderProjects\Kafaka\Kafka.Consumer\Program.cs:line 50

Digging further inside of the Consumer Builder -> Consume Function on line 682 an exception is given:

System.NullReferenceException: Object reference not set to an instance of an object.
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)

The inspection of this.valueDeserializer does show that deserializeImpl is nulled.

Docker Compose Configuration

docker-compose.txt

I've attached the Docker Compose file, that contains Zookeeper, Kafka, Schema Registry, Kafka Connect, MySQL.

Consumer program

Program.txt

The consume program follows the AvroSpecific example rather closely, though did split out the Deserializers as their own variables.

Avro Model

TestAvro.txt

The Test Avro file follows the example rather closely, the schema was grabbed through the SchemaRegistry API, though I did add in the namespace.

Data

Starting a bin session into the SchemaRegistry container I ran the following command:

kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --property schema.registry.url="http://schema-registry:8081" --from-beginning --max-messages 10

and through that I got the following:

[2020-10-12 18:33:50,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-12 18:33:51,477] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-45422
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2020-10-12 18:33:51,555] INFO Kafka version : 2.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-12 18:33:51,555] INFO Kafka commitId : 4b1dd33f255ddd2f (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-12 18:33:51,745] INFO Cluster ID: D2RhPuzwQwivasIv_93h8A (org.apache.kafka.clients.Metadata)
[2020-10-12 18:33:51,748] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-12 18:33:51,752] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-12 18:33:51,752] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-12 18:33:51,769] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-12 18:33:51,770] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] Setting newly assigned partitions [quickstart-jdbc-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-12 18:33:51,786] INFO [Consumer clientId=consumer-1, groupId=console-consumer-45422] Resetting offset for partition quickstart-jdbc-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)
{"id":1,"name":{"string":"alice"},"email":{"string":"alice@abc.com"},"department":{"string":"engineering"},"modified":1602083662000}
{"id":2,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083662000}
{"id":3,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083662000}
{"id":4,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083662000}
{"id":5,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083663000}
{"id":6,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083663000}
{"id":7,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083663000}
{"id":8,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083663000}
{"id":9,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083663000}
{"id":10,"name":{"string":"bob"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":1602083664000}

Big thanks for reading through this!

mhowlett commented 3 years ago

how did you make TestAvro.txt? It think it's because there's no _SCHEMA field, which there would be if AvroGen made it.

btw i would recommend protobuf over avro now we've added it - the implementation in .net is much more mature.

indraneeldey1 commented 3 years ago

I made the original class via hand, so I redid the TestAvro.txt (really a cs file) using AvroGen and added that back in, though I get the same behavior. I also added a KeyDeserializer as well:

var keyDeserializer = new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync(); 

sing (var consumer = new ConsumerBuilder<string, TestAvro>(consumerConfig)
                    .SetKeyDeserializer(keyDeserializer)
                    .SetValueDeserializer(valueDeserializer)
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build())

Though now I get the same issue, just on the Key, its saying: Local: Key deserialization error instead. I tried to pause on the class methods through and found it would not go into the class.

Confluent.Kafka.ConsumeException: Local: Key deserialization error
 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl`1.Deserialize(String topic, Byte[] array)
   at Confluent.SchemaRegistry.Serdes.AvroDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
   at Kafka.Consumer.Program.Main(String[] args) in C:\Users\idey\RiderProjects\Kafaka\Kafka.Consumer\Program.cs:line 51

Also just to confirm I was able to hit http://localhost:8081/schemas/ids/1 via PostMan and was returned the following:

{
    "schema": "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"department\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"modified\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}}],\"connect.name\":\"test\"}"
}

I'll look into swapping over Protobuf as well.