tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

Google protobuf 'repeated' data types not supporting (read only properties are not included in insert statements) #12

Closed vijaymandave closed 2 years ago

vijaymandave commented 2 years ago

Hi,

referring issue: https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/issues/10

I am using below code to insert record into Kafka table with nuget Kafka.DotNet.ksqlDB version 2.0.0.

POCO class

using Google.Protobuf.Collections;
public class Kafka_table_order
{
    public string Rowkey { get; set; }
    public int Id { get; set; }
    public long Ordertime { get; set; }
    public RepeatedField<double> Items { get;  }
}

RepeatedField is google protobuf type which refers IEnumerable

C# Insert record code

using Kafka.DotNet.ksqlDB.KSql.Linq;
using Kafka.DotNet.ksqlDB.KSql.Linq.PullQueries;
using Kafka.DotNet.ksqlDB.KSql.Linq.Statements;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.RestApi;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements.Properties;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;

public async Task<bool> Write<T>(List<T> models)
{
    var httpClientFactory = new HttpClientFactory(new Uri(_url));
    var restApiClient = new KSqlDbRestApiClient(httpClientFactory);
    var properties = new InsertProperties() { ShouldPluralizeEntityName = false };
    var builder = new StringBuilder();
    foreach (var model in models)
    {
        var bulkStatementSql = restApiClient.ToInsertStatement(model, properties).Sql;
        builder.Append(bulkStatementSql);       
    }

    var bulkStatement = new KSqlDbStatement(builder.ToString());
    var responseMessage = await restApiClient.ExecuteStatementAsync(bulkStatement);
    return responseMessage.IsSuccessStatusCode;
}

SQL query generated :

INSERT INTO kafka_table_order(rowkey,id,ordertime) VALUES('12344', 25, 1633021200);

Issue : In generated SQL query, items not included in INSERT query which is repeated double data type.

Please suggest.

tomasfabian commented 2 years ago

Hi @vijaymandave, I've just tried the following example:

var order = new Kafka_table_order
            {
              Rowkey = "12344",
              Id = 25,
              Ordertime = 1633021200,
              Items = new RepeatedField<double>() { 1.1, 2.3 }
            };

await Write(new List<Kafka_table_order>() { order });

and the generated insert looks like the following:

INSERT INTO Kafka_table_order (Rowkey, Id, Ordertime, Items) VALUES ('12344', 25, 1633021200, ARRAY[1.1, 2.3]);

Your Items property doesn't have a setter, is it ok?

Edit: Without it the property is not included in the insert statement. I had to amend it for the above example.

public RepeatedField<double> Items { get; set; }

Regards Tomas

tomasfabian commented 2 years ago

Hi @vijaymandave, by convention the read only properties are excluded during the generation of Insert statements. Were you able to solve your issue by adding a setter to your IEnumerable property?

With the latest version of the package you can also simplify your Write method in the following manner (I added support for bulk inserts, the result should be the same):

using System.Collections.Generic;
using System.Threading.Tasks;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Properties;

public static async Task<bool> Write<T>(List<T> models)
{
  await using KSqlDBContext context = new KSqlDBContext(@"http:\\localhost:8088");

  var properties = new InsertProperties { ShouldPluralizeEntityName = false };

  foreach (var model in models)
    context.Add(model, properties);

  var responseMessage = await context.SaveChangesAsync();

  return responseMessage.IsSuccessStatusCode;
}

The package had to be renamed from Kafka.DotNet.ksqlDB to ksqlDB.RestApi.Client. It will be necessary to migrate/fix the namespaces in your code base:

Install-Package ksqlDb.RestApi.Client -Version 1.3.0

Another option is to use init only properties:

public class Kafka_table_order {
  public RepeatedField<double> Items { get; init; } = new();
}

or record types in C# 9+

public record Kafka_table_order (RepeatedField<double> Items);

Tomas

vijaymandave commented 2 years ago

Hi @tomasfabian, Yes, after adding a setter, it's working.

Due to some constraints, I cannot add/change a setter to repeated property in POCO class, instead I am using AddRange() method to assign values to repeated properties. Please suggest on this scenario.

Appreciate your work. Thanks. -Vijay

tomasfabian commented 2 years ago

hi @vijaymandave, I added the following config option InsertProperties.IncludeReadOnlyProperties in ksqlDb.RestApi.Client version 1.3.1 (default is false):

var insertProperties = new InsertProperties
                       {
                         IncludeReadOnlyProperties = true
                       };

Tomas

vijaymandave commented 2 years ago

Thank you @tomasfabian. IncludeReadOnlyProperties = true working for me.