confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
53 stars 857 forks source link

AvroDeserializer unable to consume message #1032

Open tjvitry opened 5 years ago

tjvitry commented 5 years ago

Description

Created a schema with an array of records. Ran the avrogen utility which generated two classes, one for the schema and one for the embedded record type. Published a message using the AvroSerializer class. When attempting to consume the message get the following exception:

{"Local: Value deserialization error"} ConsumerRecord: {Confluent.Kafka.ConsumeResult<byte[], byte[]>} Data: {System.Collections.ListDictionaryInternal} Error: {Local: Value deserialization error} HResult: -2146233088 HelpLink: null InnerException: {"No matching schema for {\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Unit\",\"namespace\":\"com.centurylink.acms.event\",\"fields\":[{\"name\":\"unitNumber\",\"doc\":\"@required: false, @description: a specific unit number for an individual unit within a multi-dwelling unit, @examples: 1|101\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"type\",\"doc\":\"@required: false, @description: the type of the unit, @examples: Apartment|Building\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"story\",\"doc\":\"@required: false, @description: the story or floor number for the unit, @examples: 1|2|3\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"fiberCount\",\"doc\":\"@required: false, @description: the number of fibers available at the unit, @examples: 1|4\",\"default\":null,\"type\":[\"null\",\"string\"]}]}} in {\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"unit\",\"namespace\":\"com.centurylink.acms. event\",\"fields\":[{\"name\":\"unitNumber\",\"doc\":\"@required: false, @description: a specific unit number for an individual unit within a multi-dwelling unit, @examples: 1|101\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"type\",\"doc\":\"@required: false, @description: the type of the unit, @examples: Apartment|Building\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"story\",\"doc\":\"@required: false, @description: the story or floor number for the unit, @examples: 1|2|3\",\"default\":null,\"type\":[\"null\",\"string\"]},{\"name\":\"fiberCount\",\"doc\":\"@required: false, @description: the number of fibers available at the unit, @examples: 1|4\",\"default\":null,\"type\":[\"null\",\"string\"]}]}}]} in field units"} Message: "Local: Value deserialization error" Source: "Confluent.Kafka" StackTrace: " at Confluent.Kafka.Consumer2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer1 keyDeserializer, IDeserializer1 valueDeserializer)\r\n at Confluent.Kafka.Consumer2.Consume(CancellationToken cancellationToken)\r\n at GeoESWS.Consumer.Listen(String message) in C:\Users\xxx\source\repos\geoesws\Consumer.cs:line 88" TargetSite: {Confluent.Kafka.ConsumeResult2[K,V] ConsumeImpl[K,V](Int32, Confluent.Kafka.IDeserializer1[K], Confluent.Kafka.IDeserializer`1[V])}

How to reproduce

Abbreviated schema used:

{ "type": "record", "name": "NewConstructionAddressEvent", "namespace": "com.centurylink.acms.event", "doc": "@author: xxx, @description: Avro Schema for an address in a new construction area", "fields": [ { "name": "eventId", "type": { "type": "string", "avro.java.string": "String" }, "doc": "@required: true, @description: unique id (UUID version 4 and variant 2) for an event, @examples: d15f36fe-ab1e-4d5c-9a04-a1827ac0c330" }, { "name": "units", "type": [ "null", { "type": "array", "items": { "type": "record", "name": "unit", "fields": [ { "name": "unitNumber", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: a specific unit number for an individual unit within a multi-dwelling unit, @examples: 1|101", "default": null }, { "name": "type", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the type of the unit, @examples: Apartment|Building", "default": null }, { "name": "story", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the story or floor number for the unit, @examples: 1|2|3", "default": null }, { "name": "fiberCount", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the number of fibers available at the unit, @examples: 1|4", "default": null } ]
} } ], "doc": "@required: false, @description: unit numbers will be available for multi-dwelling unit - demand points, @examples: unit number details", "default": null } ] }

Code snippet:

try { while (true) { try { var schemaRegistryConfig = new SchemaRegistryConfig { // Note: you can specify more than one schema registry url using the // schema.registry.url property for redundancy (comma separated list). // The property name is not plural to follow the convention set by // the Java implementation. SchemaRegistryUrl = configuration.GetConnectionString("SchemaRegistryUrl"), // optional schema registry client properties: SchemaRegistryRequestTimeoutMs = 5000, SchemaRegistryMaxCachedSchemas = 10 };
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) using (IConsumer<string, NewConstructionAddressEvent> consumer = new ConsumerBuilder<string, NewConstructionAddressEvent>(config) //using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers}) .SetKeyDeserializer(new AvroDeserializer(schemaRegistry).AsSyncOverAsync()) .SetValueDeserializer(new AvroDeserializer(schemaRegistry).AsSyncOverAsync()) .Build()) { consumer.Subscribe(topic1.DevTopic);

                        while (true)
                        {
                            ConsumeResult<string, NewConstructionAddressEvent> consumeResult = consumer.Consume(cts);//TimeSpan.FromMilliseconds(50000)//new TimeSpan(0,0,1)

                            if (consumeResult != null)
                            {
                                Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");

                            }
                            else
                            {
                                //Debug.Log("consumer Result is null");
                            }

                        }
                    }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }

Checklist

Please provide the following information:

Ram-97 commented 4 years ago

@tjvitry I think, you have missed to mention the custom class type in AvroDeserializer.

var consumer = new ConsumerBuilder<string, NewConstructionAddressEvent>(consumerConfig) .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync()) .SetValueDeserializer(new AvroDeserializer<NewConstructionAddressEvent>(schemaRegistry).AsSyncOverAsync())

Here is the example program for custom class instead of GenericRecord

I am a begineer. So if i am wrong, please help me to understand default behavior of AvroDeserializer.

lamrani00 commented 1 year ago

i have the same issue because in my errors i found Logical type 'eda-local-datetime' is not supported