confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
89 stars 869 forks source link

GenericRecord required to write against record schema but found Avro.Generic.GenericRecord #1153

Open manudc opened 4 years ago

manudc commented 4 years ago

Description

Same as 1045 issue I also got the same error. "GenericRecord required to write against record schema but found System.String in field FUNDS".

problem is can't populate nested record type. record -field level1 -record level1 -field level 2 -field level 2 -field level1

Do you have sample code in C# using GenericRecord to write to kafka topic using ProduceAsync with nested structure?

I got the exception in Confluent.SchemaRegistry.Serdes.GenericSerializerImpl class and method thrown exception is new GenericWriter(writerSchema) .Write(data, new BinaryEncoder(stream));

How to reproduce

Checklist

Please provide the following information:

orendin commented 4 years ago

Hi all

I am currently running into the same issue as reported above and in 1045. I am not sure if this is a bug or what I am doing wrong. To reproduce, I have used an extended version of your AvroGeneric example (see below).

With this, you can repeatedly reproduce the following error message: Confluent.Kafka.ProduceException`2[System.String,Avro.Generic.GenericRecord]: Local: Value serialization error ---> Avro.AvroException: GenericRecord required to write against record schema but found Avro.Generic.GenericRecord in field Metadata.

Not sure if my schemas are wrong or what the problem might be.

Currently using the latest version of Kafka (from the docker examples cp-all-in-one). The examples are using Confluent.Apache.Avro 1.7.7.7. I'm on Windows 10.

I'd appreciate any hint on what might be wrong.

Thanks

Ben

// Copyright 2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Confluent.Kafka.Examples.AvroGeneric
{
    class Program
    {
        static async Task Main(string[] args)
        {
            if (args.Length != 3)
            {
                Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
                return;
            }

            string bootstrapServers = args[0];
            string schemaRegistryUrl = args[1];
            string topicName = args[2];
            string groupName = "avro-generic-example-group";

            // var s = (RecordSchema)RecordSchema.Parse(File.ReadAllText("my-schema.json"));
            var s = (RecordSchema)RecordSchema.Parse(
                @"{
                    ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
                    ""type"": ""record"",
                    ""name"": ""User"",
                    ""fields"": [
                        {""name"": ""name"", ""type"": ""string""},
                        {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                        {""name"": ""favorite_color"", ""type"": [""string"", ""null""]},
                        {""name"": ""Metadata"",
                              ""doc"":  ""Mandatory metadata for all event types"",
                              ""type"": {
                                ""namespace"": ""SomeOtherNamespace"",
                                ""name"": ""Metadata"",
                                ""type"": ""record"",
                                ""fields"": [
                                  {
                                    ""name"": ""AggregateId"",
                                    ""type"": ""string"",
                                    ""doc"": ""Identifies the aggregate""
                                  },
                                  {
                                    ""name"": ""TraceId"",
                                    ""type"": ""string"",
                                    ""doc"": ""UUID according to ISO/IEC 11578:1996. The TraceId identifies the event uniquely and must be logged and transported to all processing entities in the context of this event. Format pattern: Pattern.compile('([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12})'""
                                  },
                                  {
                                    ""doc"": ""Timestamp of message on the global timeline, independent of a particular time zone or calendar, with a precision of one microseconds."",
                                    ""name"": ""Timestamp"",
                                    ""type"": {
                                      ""type"": ""long"",
                                      ""logicalType"": ""timestamp-micros""
                                    }
                                  }
                                ]
                              }
                            }
                    ]
                  }"
            );

            var t = (RecordSchema)RecordSchema.Parse(@"{
  ""namespace"": ""SomeOtherNamespace"",
  ""name"": ""Metadata"",
  ""type"": ""record"",
  ""fields"": [
    {
      ""name"": ""AggregateId"",
      ""type"": ""string"",
      ""doc"": ""Identifies the aggregate taking into account that this aggregate could be emitted / created from different IDP instances. Therefore it is a compound (concatenation) of the Sub and the IssuerIdentifier.""
    },
    {
      ""name"": ""TraceId"",
      ""type"": ""string"",
      ""doc"": ""UUID according to ISO/IEC 11578:1996. The TraceId identifies the event uniquely and must be logged and transported to all processing entities in the context of this event. Format pattern: Pattern.compile('([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12})'""
    },
    {
      ""doc"": ""Timestamp of message on the global timeline, independent of a particular time zone or calendar, with a precision of one microseconds."",
      ""name"": ""Timestamp"",
      ""type"": {
        ""type"": ""long"",
        ""logicalType"": ""timestamp-micros""
      }
    }
  ]
}");

            CancellationTokenSource cts = new CancellationTokenSource();
            var consumeTask = Task.Run(() =>
            {
                using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
                using (var consumer =
                    new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
                        .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build())
                {
                    consumer.Subscribe(topicName);

                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var consumeResult = consumer.Consume(cts.Token);

                                Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        // commit final offsets and leave the group.
                        consumer.Close();
                    }
                }
            });

            using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
            using (var producer =
                new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
                    .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build())
            {
                Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

                int i = 0;
                string text;
                while ((text = Console.ReadLine()) != "q")
                {
                    var record = new GenericRecord(s);
                    record.Add("name", text);
                    record.Add("favorite_number", i++);
                    record.Add("favorite_color", "blue");

                    var metadataRecord = new GenericRecord(t);
                    metadataRecord.Add("AggregateId", Guid.NewGuid().ToString());
                    metadataRecord.Add("TraceId", Guid.NewGuid().ToString());
                    metadataRecord.Add("Timestamp", DateTime.Now.Ticks);

                    record.Add("Metadata", metadataRecord);

                    await producer
                        .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                        .ContinueWith(task =>
                        {
                            var result = task.IsFaulted
                                ? $"error producing message: {task.Exception}"
                                : $"produced to: {task.Result.TopicPartitionOffset}";

                            Console.WriteLine(result);

                            return result;
                        });
                }
            }

            cts.Cancel();
        }
    }
}