tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

[Pull Query] JsonException when a field contains a comma #1

Closed tmrocha89 closed 2 years ago

tmrocha89 commented 2 years ago

When we have a string that contains a comma in it, I will throw an exception when making a pull query.

This is the stack trace of the exception

 ---> System.Text.Json.JsonReaderException: '0x0A' is invalid within a JSON string. The string should be correctly escaped. LineNumber: 2 | BytePositionInLine: 17.
   at System.Text.Json.ThrowHelper.ThrowJsonReaderException(Utf8JsonReader& json, ExceptionResource resource, Byte nextByte, ReadOnlySpan`1 bytes)
   at System.Text.Json.Utf8JsonReader.ConsumeStringAndValidate(ReadOnlySpan`1 data, Int32 idx)
   at System.Text.Json.Utf8JsonReader.ConsumeString()
   at System.Text.Json.Utf8JsonReader.ConsumeValue(Byte marker)
   at System.Text.Json.Utf8JsonReader.ReadSingleSegment()
   at System.Text.Json.Utf8JsonReader.Read()
   at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   --- End of inner exception stack trace ---
   at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, JsonReaderException ex)
   at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   at System.Text.Json.JsonSerializer.ReadCore[TValue](JsonConverter jsonConverter, Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
   at System.Text.Json.JsonSerializer.ReadCore[TValue](Utf8JsonReader& reader, Type returnType, JsonSerializerOptions options)
   at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, Type returnType, JsonSerializerOptions options)
   at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, JsonSerializerOptions options)
   at Kafka.DotNet.ksqlDB.KSql.RestApi.Query.KSqlDbQueryProvider.CreateRowValue[T](String rawJson)
   at Kafka.DotNet.ksqlDB.KSql.RestApi.Query.KSqlDbQueryProvider.OnLineRead[T](String rawJson)
   at Kafka.DotNet.ksqlDB.KSql.RestApi.KSqlDbProvider.Run[T](Object parameters, CancellationToken cancellationToken)+MoveNext()
   at Kafka.DotNet.ksqlDB.KSql.RestApi.KSqlDbProvider.Run[T](Object parameters, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at System.Linq.AsyncEnumerable.<TryGetFirst>g__Core|291_0[TSource](IAsyncEnumerable`1 source, CancellationToken cancellationToken) in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/FirstOrDefault.cs:line 119
   at System.Linq.AsyncEnumerable.<TryGetFirst>g__Core|291_0[TSource](IAsyncEnumerable`1 source, CancellationToken cancellationToken) in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/FirstOrDefault.cs:line 121
   at System.Linq.AsyncEnumerable.<FirstOrDefaultAsync>g__Core|287_0[TSource](IAsyncEnumerable`1 source, CancellationToken cancellationToken) in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/FirstOrDefault.cs:line 30
   at Exp_ksql_pull_queries.Program.Main(String[] args) in /mnt/tmrochahd/cloud-poc/ConsoleKafkaExperiments/Exp_ksql_pull_queries/Program.cs:line 21
   at Exp_ksql_pull_queries.Program.Main(String[] args) in /mnt/tmrochahd/cloud-poc/ConsoleKafkaExperiments/Exp_ksql_pull_queries/Program.cs:line 24
   at Exp_ksql_pull_queries.Program.<Main>(String[] args)

The code to reproduce this error is this

var contextOptions = new KSqlDBContextOptions(@"http:\\localhost:8088")
{
    ShouldPluralizeFromItemName = false
};

await using var context = new KSqlDBContext(contextOptions);
var result = await context.CreatePullQuery<Order>("tbl_orders_query")
                  .Where(c => c.Order_Id == 1)
                  .GetAsync();
    Console.WriteLine($"Example _02: {result?.Order_Id} - {result?.name}");

I also leave here the information about my data in Kafka

 Table Name                        |  Kafka Topic        | Key Format | Value Format | Windowed 
----------------------------------------------------------------------------------
 TBL_ORDERS_QUERY                  | TBL_ORDERS_QUERY    | KAFKA      | AVRO        | false

Name                 : TBL_ORDERS_QUERY
 Field      | Type                           
---------------------------------------------
 ORDER_ID   | INTEGER          (primary key) 
 NAME       | VARCHAR(STRING)                
 CREATED_AT | INTEGER                        
---------------------------------------------

|ORDER_ID              |NAME            |CREATED_AT    |
+--------------------------+-----------------+----------------------+
|1                              |Test1.8,8        |18833 

I'm sorry if this is a known issue but I haven't found any documentation nor workaround.

tomasfabian commented 2 years ago

Hi @tmrocha89, thank you for the detailed explanation of the issue.

I released a release candidate with a bug fix 1.3.0-rc.2.

Could you please verify it.

Regards Tomas.

tmrocha89 commented 2 years ago

I forgot to mention that I was using v1.2.0.

You took my chance to contribute to this project with your super-fast fix :) Everything is working correctly with the 1.3.0-rc.2

Thank you very much @tomasfabian.