confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
41 stars 857 forks source link

Key deserialization error #1392

Open MrLuje opened 4 years ago

MrLuje commented 4 years ago

Description

We have some consumers running in a GKE cluster and using a confluent cloud cluster with a provided schema registry (on avro). From time to time, consumers are unable to consume due to "Key deserialization error" which is solved by restarting our app, from 1 to x times until it works... so it is unlikely to be a message issue (since it will be unable to process it forever if it was the case).

How to reproduce

We are using a consumer flow like the one described in the AvroSpecific example. The main difference is that we are starting multiple consumers (one per topic) in the same program, each are created in the separate thread and consume their messages in their own thread.

Confluent.Kafka.ConsumeException: Local: Key deserialization error ---> System.Threading.Tasks.TaskCanceledException: The operation was canceled. ---> System.IO.IOException: Unable to read data from the transport connection: Operation canceled. ---> System.Net.Sockets.SocketException: Operation canceled --- End of inner exception stack trace --- at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error) at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.GetResult(Int16 token) at System.Net.Security.SslStreamInternal.<FillBufferAsync>g__InternalFillBufferAsync|38_0[TReadAdapter](TReadAdapter adap, ValueTask'1 task, Int32 min, Int32 initial) at System.Net.Security.SslStreamInternal.ReadAsyncInternal[TReadAdapter](TReadAdapter adapter, Memory'1 buffer) at System.Net.Http.HttpConnection.FillAsync() at System.Net.Http.HttpConnection.ReadNextResponseHeaderLineAsync(Boolean foldedHeadersAllowed) at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken) --- End of inner exception stack trace --- at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken) at System.Net.Http.HttpConnectionPool.SendWithNtConnectionAuthAsync(HttpConnection connection, HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken) at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken) at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) at System.Net.Http.HttpClient.FinishSendAsyncBuffered(Task'1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts) at Confluent.SchemaRegistry.RestService.ExecuteOnOneInstanceAsync(Func'1 createRequest) at Confluent.SchemaRegistry.RestService.RequestAsync[T](String endPoint, HttpMethod method, Object[] jsonBody) at Confluent.SchemaRegistry.RestService.GetSchemaAsync(Int32 id, String format) at Confluent.SchemaRegistry.CachedSchemaRegistryClient.GetSchemaAsync(Int32 id, String format) at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl'1.Deserialize(String topic, Byte[] array) at Confluent.SchemaRegistry.Serdes.AvroDeserializer'1.DeserializeAsync(ReadOnlyMemory'1 data, Boolean isNull, SerializationContext context) at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer'1.Deserialize(ReadOnlySpan'1 data, Boolean isNull, SerializationContext context) at Confluent.Kafka.Consumer'2.Consume(Int32 millisecondsTimeout) --- End of inner exception stack trace --- at Confluent.Kafka.Consumer'2.Consume(Int32 millisecondsTimeout) at Confluent.Kafka.Consumer'2.Consume(TimeSpan timeout)

From what we are seeing, it may be caused if the schema registry is unreachable on the first request. On "Key deserialization error", we consider this as a fatal error and stop the consumer.

Checklist

Please provide the following information:

new ConsumerConfig
            {
                BootstrapServers = "xxx.gcp.confluent.cloud:9092",
                SaslUsername = <username>,
                SaslPassword = <password>,
                GroupId = Guid.NewGuid().ToString(),
                AutoOffsetReset = AutoOffsetReset.Earliest,
                SaslMechanism = SaslMechanism.Plain,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                Acks = Acks.Leader,
                EnableAutoCommit = false,
                MaxPollIntervalMs = (int)TimeSpan.FromMinutes(10).TotalMilliseconds,
                SessionTimeoutMs = (int)TimeSpan.FromSeconds(10).TotalMilliseconds),
                EnablePartitionEof = true
            };
stefan-benjamin commented 4 years ago

I am experiencing the same issue when running in a docker container. It seems that the Schema Registry client uses the configured/provided timeout for each HttpClient used to access the Rest API. The same value seems to be used by the Kafka client when trying to resolve schema. If the first schema registry url fails after the specified timeout, the second is never tried as the consumer/produce exception is already thrown.

I also tried on a Windows machine and the behavior cannot be reproduced - seems that the OS causes the HttpClient to fail before the specified timeout when the Url cannot be reached. These are my findings, hope it helps.

Please let me know if you find any solution/workaround for this issue.

mhowlett commented 4 years ago

thanks both. so task cancelled exception and possible opportunity for better error handling / retry logic.

the task cancellation exception does suggest a timeout (i think), so maybe setting SchemaRegistryRequestTimeoutMs high will help, though i'm skeptical.

i'll have a conversation with the cloud people to alert them / understand if this is a known issue.

i'm thinking we should put some configurable retry logic in ExecuteOnOneInstanceAsync.

stefan-benjamin commented 4 years ago

Thank you for the quick response @mhowlett.

Setting the SchemaRegistryRequestTimeoutMs to a very high value does indeed help as the HttpClientcall to the first URL fails (most likely induced by OS) before the total timeout specified by the configuration is expired. However, to get working results, I had to set this value to 200000 and the Schema Registry client is able to connect to second URL and consume messages after about 120-130 sec... This again happens only in the docker environment, not when running the application on Windows. It is also a little inconvenient that the application has to wait so long before being able to consume/produce the first message if the first schema reg node is down.

I am looking forward for a more robust solution. I am also wondering if this behavior can be caused by the docker networking environment..

Thanks again, I look forward to hearing updates 👍

stefan-benjamin commented 3 years ago

After further investigation, I found out that the HttpClient used in ExecuteOnOneInstanceAsync in some cases throws a TaskCancelledException on timeout, according to: https://github.com/dotnet/runtime/issues/21965. This exception is not handled in ExecuteOnOneInstanceAsync and is therefore thrown to the caller, without the RestService advancing to the next HttpClient based on the configured Uri. Can this be the cause of the issue? I made a very small application that only uses the CachedSchemaRegistryClient for a single query. 3 Uris are configured, default timeout of 30 seconds, and the first 2 machines are shut down (leaving only the third available). The following stack trace is produced. I hope this helps.

image

laurentaubin commented 3 years ago

Is there any more info on how to fix this ? I'm experiencing the same issue in a docker environment.

Edit: If anyone comes across the issue, I've found that rebooting the host machine can be a dirty fix. I would love it to be more reliable than that however !

https://stackoverflow.com/questions/51092145/confluent-3-3-unable-to-connect-to-schema-registry-locally-with-kafka-avro-conso

GuyG-Saha commented 1 year ago

Hi everyone, I am experiencing "Local: Key deserialization error" Exception in Confluent Platform version 7.3.2 using Confluent.Kafka library version 2.0.2 on Ubuntu without using SchemaRegistry at all. Exception is thrown when Consumer first tries to consume from topic.

Any ideas on how to solve this without restarting the app?