tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

Google protobuf 'repeated' data types not supporting (read only properties are not included in pull query) #14

Closed vijaymandave closed 2 years ago

vijaymandave commented 2 years ago

Hi,

I am using below code to read record from Kafka table with nuget ksqlDB.RestApi.Client version 1.3.1.

POCO class

using Google.Protobuf.Collections;
public class Kafka_table_order
{
    public string Rowkey { get; set; }
    public int Id { get; set; }
    public long Ordertime { get; set; }
    public RepeatedField<double> Items { get;  }
}

RepeatedField is google protobuf type which refers IEnumerable

C# Insert record code

using Kafka.DotNet.ksqlDB.KSql.Linq;
using Kafka.DotNet.ksqlDB.KSql.Linq.PullQueries;
using Kafka.DotNet.ksqlDB.KSql.Linq.Statements;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.RestApi;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements.Properties;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;

public override async Task<List<T>> Read<T>(string tableName, Expression<Func<T, bool>> conditions)
 {
    var options = new KSqlDBContextOptions(_url) { ShouldPluralizeFromItemName = false };

    options.QueryParameters.Properties["ksql.query.pull.table.scan.enabled"] = "true";

    await using var context = new KSqlDBContext(options);

    tableName = string.Concat("QUERYABLE_", tableName);            

    var list = await context.CreatePullQuery<T>(tableName)
                                    .Where(conditions)
                                    .GetManyAsync()
                                    .ToListAsync();
    return list;
 }

Issue :In fetched records, items not included of repeated data type.

Please suggest.

tomasfabian commented 2 years ago

Hi @vijaymandave, ksqlDB.RestApi.Client uses System.Text.Json to deserialize (materialize) the incoming values and it doesn't support collection properties without a setter.

In this case the important part is that RepeatedField<T> refers to ICollection<T>:

public interface ICollection<T> : IEnumerable<T>
{
  void Add(T item);
  //..
}

Would it be a viable option for you to register/plugin a custom JSON converter into KSqlDbProvider?

A workaround from Stackoverflow can be found here or you can implement your own solution.

Proposed ksqlDB.RestApi.Client APIs:

var contextOptions = new KSqlDbContextOptionsBuilder()
        .UseKSqlDb(ksqlDbUrl)
        .SetJsonSerializerOptions(c =>
        {
          c.Converters.Add(new CustomJsonConverter());
        }).Options;

and

contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
  .SetJsonSerializerOptions(serializerOptions =>
                            {
                              serializerOptions.Converters.Add(new CustomJsonConverter());
                            });
vijaymandave commented 2 years ago

Hi @tomasfabian, Proposed solution looks fine to me. Please integrate proposed solution into ksqlDB.RestApi.Client API, will test it same and share with you the result. Thank you.

tomasfabian commented 2 years ago

Hi @vijaymandave, I released ksqlDb.RestApi.Client -Version 1.4.0 with the above mentioned functionality.

Thank you, too.

vijaymandave commented 2 years ago

Thanks a lot @tomasfabian. A custom JSON converter into KSqlDbProvider option working for me as you suggested.

tomasfabian commented 2 years ago

You are welcome @vijaymandave.