tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

JsonException when a field contains a ']' #43

Closed blatda closed 1 year ago

blatda commented 1 year ago

I have a string in a record containing a bracket which throws an exception on push query. It jumps to the OnError method and the query stops. Is there any solution to isolate the exception within line processing (skip the malformed line) and keep the query running?

Only proper solution I see... is to escape all strings before sending them to Kafka. Is there any other?

System.Text.Json.JsonException: ':' is invalid after a value. Expected either ',', '}', or ']'. Path: $.after | LineNumber: 4 | BytePositionInLine: 20.
 ---> System.Text.Json.JsonReaderException: ':' is invalid after a value. Expected either ',', '}', or ']'. LineNumber: 4 | BytePositionInLine: 20.
   at System.Text.Json.ThrowHelper.ThrowJsonReaderException(Utf8JsonReader& json, ExceptionResource resource, Byte nextByte, ReadOnlySpan`1 bytes)
   at System.Text.Json.Utf8JsonReader.ConsumeNextToken(Byte marker)
   at System.Text.Json.Utf8JsonReader.ConsumeNextTokenOrRollback(Byte marker)
   at System.Text.Json.Utf8JsonReader.ReadSingleSegment()
   at System.Text.Json.Utf8JsonReader.Read()
   at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
   at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   --- End of inner exception stack trace ---
   at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, JsonReaderException ex)
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo jsonTypeInfo, Nullable`1 actualByteCount)
   at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 json, JsonTypeInfo jsonTypeInfo)
   at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, JsonSerializerOptions options)
   at ksqlDB.RestApi.Client.KSql.RestApi.Query.KSqlDbQueryProvider.CreateRowValue[T](String rawJson)
   at ksqlDB.RestApi.Client.KSql.RestApi.Query.KSqlDbQueryProvider.OnLineRead[T](String rawJson)
   at ksqlDB.RestApi.Client.KSql.RestApi.KSqlDbProvider.ConsumeAsync[T](StreamReader streamReader, SemaphoreSlim semaphoreSlim, CancellationToken cancellationToken)+MoveNext()
   at ksqlDB.RestApi.Client.KSql.RestApi.KSqlDbProvider.ConsumeAsync[T](StreamReader streamReader, SemaphoreSlim semaphoreSlim, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at ksqlDB.RestApi.Client.KSql.RestApi.KSqlDbProvider.Run[T](Object parameters, CancellationToken cancellationToken)+MoveNext()
   at ksqlDB.RestApi.Client.KSql.RestApi.KSqlDbProvider.Run[T](Object parameters, CancellationToken cancellationToken)+MoveNext()
   at ksqlDB.RestApi.Client.KSql.RestApi.KSqlDbProvider.Run[T](Object parameters, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at System.Linq.AsyncEnumerable.ToObservableObservable`1.<>c__DisplayClass2_0.<<Subscribe>g__Core|0>d.MoveNext() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs:line 50
tomasfabian commented 1 year ago

Hi @blatda, could you please provide an example record which causes you the above mentioned error, please?
Short code snippet of your model and query etc. would be also helpful in order to reproduce your issue.

The error message is complaining about a colon:

':' is invalid after a value

Thank you in advance

blatda commented 1 year ago

Thanks for the quick reaction! I couldn't get exactly the same error. It depends on the exact position of the ']' obviously. But throws very similar one:

// Topic: 'clients' contains Key: {"Id": 1}, Value: {"Before": {"Id":1, "Name": "John]"}, "After": {"Id":1, "Name": "John"}}
// Stream: CREATE STREAM CLIENTS (keystruct STRUCT<Id INT> KEY, Id INT, Name STRING) WITH (KAFKA_TOPIC='clients', KEY_FORMAT='JSON', VALUE_FORMAT='JSON');
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;

const string kSqlDbUrl = "yourksqlserver:8088";
const string kStreamName = "CLIENTS";

IDisposable subscription = RunQuery();

Console.WriteLine("Waiting for query results... press key to exit.");
Console.ReadKey();

subscription.Dispose();

IDisposable RunQuery()
{
    KSqlDBContext kContext = new KSqlDBContext(kSqlDbUrl);

    return kContext.CreateQuery<CdcObject>(kStreamName)
        .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
        .ToObservable()
        .Subscribe(client =>
        {
            Console.WriteLine($"Next: '{client}'");
        }, e =>
        {
            Console.WriteLine($"Error: '{e}'.");
        }, () =>
        {
            Console.WriteLine("Completed.");
        });
}

record CdcObject
{
    public Client? Before { get; set; }
    public Client? After { get; set; }
}
record Client
{
    public long Id { get; set; }
    public string Name { get; set; }
}

If the the Client object is not wrapped in CdcObject it will be deserialized correctly.

tomasfabian commented 1 year ago

Thank you for the details @blatda! I hopefully found a solution for your issue.

Could you please try out the RC I've just published?

dotnet add package ksqlDb.RestApi.Client --version 2.7.0-rc.2

Let me know if you will find any issue or have suggestions for improvements.

Thank you again!

blatda commented 1 year ago

Yes, it works! Thank you for such a quick fix.