confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
61 stars 861 forks source link

InvalidDataException when Proto files are out of date. How can we make this a non-stopping error? #2167

Closed saadk01 closed 8 months ago

saadk01 commented 8 months ago

Description

We are using ProtobufDeserializer to deserialize incoming Producer messages. Recently, the Producer changed its Proto files in the test environment which created a deserialization error on our end and forced the whole application to stop.

Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> System.IO.InvalidDataException: Expecting message Value with Confluent Schema Registry framing. Magic byte was 10, expecting 0
   at Confluent.SchemaRegistry.Serdes.ProtobufDeserializer`1.DeserializeAsync(ReadOnlyMemory`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context)
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
   at ....cs:line 80

We will update the Proto files, but we were wondering if it is possible to implement ErrorHandlingDeserializer that's available in Spring Kafka so that the deserialization error thrown in ProtobufDeserializer.DeserializeAsync is handled with a log message.

Spring Kafka Reference: https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/

How to reproduce

Consumer instantiation:

using var consumer = new ConsumerBuilder<string, ProtobufType>(config)
        SetValueDeserializer(new ProtobufDeserializer<ProtobufType>().AsSyncOverAsync())
        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
        .Build();
consumer.Subscribe(topicname);

Confluent Kafka Version: 1.8.2.0

Consumer configuration:

BootstrapServers = ...
GroupId = ...,
SecurityProtocol = SecurityProtocol....,
SaslMechanism = SaslMechanism....,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm....,
SaslUsername = ...,
SaslPassword = ...

Checklist

Please provide the following information: