confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
81 stars 866 forks source link

AvroSerializer/SchemaRegistry works on .NET Core but not .NET Framework 4.5.2 #955

Open xiaojie2019 opened 5 years ago

xiaojie2019 commented 5 years ago

Description

I have following the sample code (https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific) to build a Kafka Avro Producer with C# code. But it works perfectly on .NET Core project, but failed on .NET Framework 4.5.2. Failed reason is key and value serialization error.

How to reproduce

Build and Run the code provided below on Visual Studio 2015 Target Framework 4.5.2

Checklist

Please provide the following information:

public class Program { public static void Main(string[] args) { try { string bootstrapServer = "myBootstrapServer"; string topicName = "myTopicName"; string schemaRegistryUrl = "myUrl"; var saslUsername = "username"; var saslPassword="password"; var sslCaLocation = "sslCaLocation"

            var config = new ProducerConfig
            {
                BootstrapServers = bootstrapServer,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslMechanism = SaslMechanism.ScramSha512,
                SaslUsername = saslUsername,
                SaslPassword = saslPassword,
                SslCaLocation = sslCaLocation,
            };

            var schemaRegistryConfig = new SchemaRegistryConfig
            {
                SchemaRegistryUrl = schemaRegistryUrl,
                SchemaRegistryBasicAuthCredentialsSource = AuthCredentialsSource.UserInfo,
                SchemaRegistryBasicAuthUserInfo = saslUsername + ":" + saslPassword,
                SchemaRegistryRequestTimeoutMs = 5000,
                SchemaRegistryMaxCachedSchemas = 10
            };

            using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
            using (var producer =
                    new ProducerBuilder<string, RepublishRequest>(config)
                        .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                        .SetValueSerializer(new AvroSerializer<RepublishRequest>(schemaRegistry))
                        .Build())
            {
                RepublishRequest request = new RepublishRequest { EntityType = EntityType.RMSSKU,
                    EntityId = "53486682",
                    Market = Market.US,
                    SellingChannel = SellingChannel.RACK,
                    ApplicationId = "TestApp"
                };

                byte[] val = SerializerAvro.Serialize(request);
                var task = producer.ProduceAsync(topicName, new Message<string, RepublishRequest> { Key = "test", Value = request });
                task.Wait();

                if (task.IsFaulted) {
                    Console.Write($"error producing message: {task.Exception.Message}");
                } else {
                    Console.Write($"produced to: {task.Result.TopicPartitionOffset}");
                }
                producer.Flush(TimeSpan.FromSeconds(10));
            }
        } catch (Exception e) {
            Console.Write($"EXCEPTION: {e}");
        }
    }

}

RepublishRequest is auto generated Avro class.

EXCEPTION: System.AggregateException: One or more errors occurred. ---> Confluent.Kafka.ProduceException2[System.String,com.nordstrom.rms.republish.RepublishRequest]: Local: Key serialization error ---> System.Net.Http.HttpRequestException: [https://schema-registry.nonprod.us-west-2.aws.proton.nordstrom.com/] HttpRequestException: An error occurred while sending the request. at Confluent.SchemaRegistry.RestService.<ExecuteOnOneInstanceAsync>d__5.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.RestService.<RequestAsync>d__61.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.RestService.d14.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.CachedSchemaRegistryClient.d13.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1.<Serialize>d__10.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.Serdes.AvroSerializer1.d6.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Confluent.Kafka.Producer`2.d51.MoveNext() --- End of inner exception stack trace --- at Confluent.Kafka.Producer2.<ProduceAsync>d__51.MoveNext() --- End of inner exception stack trace --- at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions) at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken) at System.Threading.Tasks.Task.Wait() at Confluent.Kafka.Examples.SimpleProducer.Program.Main(String[] args) in C:\MyWork\TestApplication\SimpleKafkaProducer\Program.cs:line 79 ---> (Inner Exception #0) Confluent.Kafka.ProduceException2[System.String,com.nordstrom.rms.republish.RepublishRequest]: Local: Key serialization error ---> System.Net.Http.HttpRequestException: [https://schema-registry.nonprod.us-west-2.aws.proton.nordstrom.com/] HttpRequestException: An error occurred while sending the request. at Confluent.SchemaRegistry.RestService.d5.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.RestService.d61.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.RestService.<RegisterSchemaAsync>d__14.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.CachedSchemaRegistryClient.<RegisterSchemaAsync>d__13.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1.d10.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.d6.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Confluent.Kafka.Producer2.<ProduceAsync>d__51.MoveNext() --- End of inner exception stack trace --- at Confluent.Kafka.Producer2.d__51.MoveNext()<--- Enter to exit

aquiros-modus commented 5 years ago

Hello. Any update related to this issue? Same case for me. Visual Studio 2015 Target Framework 4.5.2

Anhbta commented 4 years ago

Same issue with .net core 3.x

timarape commented 4 years ago

I am facing the same error with .net 4.6.1

mhowlett commented 4 years ago

in the stack trace above, the inner exception is System.Net.Http.HttpRequestException, which indicates the serializer had trouble communicating with schema registry.

@magician123 , @Anhbta , @aquiros-modus - do you also see HttpRequestException, or is the underlying problem something else? I can't think of a reason this would occur in .net framework but not .net core - the execution paths are not different. it seems like at the very least there is a common trap here somewhere though.

timarape commented 4 years ago

@confluentinc it was actually the certs that had issues in my case, its now resolved. Thanks.

Ealenn commented 4 years ago

The serialization error was throw when credentials or registry configuration was incorrect.

mhowlett commented 4 years ago

marking as enhancement since I think we probably want to surface a different top level exception (that derives from ProduceException) for various common problems with SR integration.

bishal1990 commented 4 years ago

Local: Key serialization error issue please help

WhiskyLiu commented 3 years ago

Hello. Any update related to this issue? Same case for me. Visual Studio 2017 Target Framework 4.6.1. Please help...

vishwajeets7 commented 3 years ago

@mhowlett Do you have any updates to share? or any work around on this issue. Looks like common for .net platform. Earliest resolution would be appreciated.

Facing this issue for value deserialization as well, when using avro deserializer.

Thanks in advance.

jake0029 commented 3 years ago

@mhowlett do we have any updates on this?

saloniapatil commented 1 week ago

@timarape : Were you able to make it work with .net 4.6.1 or did you upgrade?