masesgroup / KEFCore

Entity Framework Core provider for Apache Kafka™
https://kefcore.masesgroup.com/
Apache License 2.0
6 stars 1 forks source link

Deserialization fails with empty arrays #249

Closed masesdevelopers closed 3 weeks ago

masesdevelopers commented 3 weeks ago

Within https://github.com/masesgroup/KEFCore/actions/runs/9655461268 many tests fails. Here the summary:

* .NET: net8.0

* OS

  * Windows

    * JDK: Corretto 21
  * Linux

    * Oracle 21
    * Zulu 21
    * Temurin 21
    * Microsoft 21
  * MacOS 13 and MacOS 14 (macos-latest)

    * Corretto 21
    * Microsoft 21
    * Temurin 21
    * Oracle 21
    * Zulu 21

The errors occurs always in net8.0, and sparsely for many JDK 21, when the array of bytes of key related to the Kafka record is retrieved from the JVM. However the problem is raised in many way within KNet code, so the issue shall be solved there.

Executing other tests, some exceptions are raised within KEFCore itself:

Unhandled exception. System.Text.Json.JsonException: The input does not contain any JSON tokens. Expected the input to start with a valid JSON token, when isFinalBlock is true. Path: $ | LineNumber: 0 | BytePositionInLine: 0.
 ---> System.Text.Json.JsonReaderException: The input does not contain any JSON tokens. Expected the input to start with a valid JSON token, when isFinalBlock is true. LineNumber: 0 | BytePositionInLine: 0.
   at System.Text.Json.ThrowHelper.ThrowJsonReaderException(Utf8JsonReader& json, ExceptionResource resource, Byte nextByte, ReadOnlySpan`1 bytes)
   at System.Text.Json.Utf8JsonReader.Read()
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   --- End of inner exception stack trace ---
   at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, JsonReaderException ex)
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo`1 jsonTypeInfo, Nullable`1 actualByteCount)
   at System.Text.Json.JsonSerializer.Deserialize[TValue](ReadOnlySpan`1 utf8Json, JsonSerializerOptions options)
   at MASES.EntityFrameworkCore.KNet.Serialization.Json.DefaultKEFCoreSerDes.ValueContainer`1.JsonRaw`1.DeserializeWithHeaders(String topic, Headers headers, Byte[] data) in /_/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs:line 318
   at MASES.EntityFrameworkCore.KNet.Serialization.Json.DefaultKEFCoreSerDes.ValueContainer`1.JsonRaw`1.Deserialize(String topic, Byte[] data) in /_/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs:line 313
   at MASES.KNet.Streams.KeyValue`4..ctor(IGenericSerDesFactory factory, KeyValue`2 value, ISerDes`2 keySerDes, ISerDes`2 valueSerDes, Boolean fromPrefetched)
   at MASES.KNet.Streams.State.KeyValueIterator`4.PrefetchableLocalEnumerator.ConvertObject(Object input)
   at MASES.JCOBridge.C2JBridge.JVMBridgeBasePrefetchableEnumerator.Advance()
   at MASES.JCOBridge.C2JBridge.JVMBridgeBasePrefetchableEnumerator.Execute(Object o)
   at System.Threading.Thread.StartHelper.Callback(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)

and the process fails due to unhandled exception in the thread used to prefetch the information. The specific exception is System.Text.Json.JsonReaderException which reports LineNumber: 0 | BytePositionInLine: 0: seems the data received does not contains information, i.e. the array of bytes is empty.

The previous condition pairs with the following:

Unhandled exception. System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection. (Parameter 'startIndex')
   at System.BitConverter.ToInt32(Byte[] value, Int32 startIndex)
   at MASES.KNet.Serialization.KNetSerialization.DeserializeInt(Boolean fallbackToKafka, String topic, Byte[] data)
   at MASES.KNet.Serialization.SerDes`2.Deserialize(String topic, TJVMT data)
   at MASES.KNet.Streams.KeyValue`4..ctor(IGenericSerDesFactory factory, KeyValue`2 value, ISerDes`2 keySerDes, ISerDes`2 valueSerDes, Boolean fromPrefetched)
   at MASES.KNet.Streams.State.KeyValueIterator`4.PrefetchableLocalEnumerator.ConvertObject(Object input)
   at MASES.JCOBridge.C2JBridge.JVMBridgeBasePrefetchableEnumerator.Advance()
   at MASES.JCOBridge.C2JBridge.JVMBridgeBasePrefetchableEnumerator.Execute(Object o)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)

where System.ArgumentOutOfRangeException most probably reports that the value input of System.BitConverter.ToInt32 has a length of zero and MASES.KNet.Serialization.KNetSerialization.DeserializeInt try to start from zero: see explanation in https://learn.microsoft.com/it-it/dotnet/api/system.bitconverter.toint32?view=net-8.0#system-bitconverter-toint32(system-byte()-system-int32)

The first kind of exception shall be managed in KEFCore, while the second one is related to KNet.

Originally posted by @masesdevelopers in https://github.com/masesgroup/KEFCore/issues/243#issuecomment-2190218833

Other than null check, add a check on empty arrays.