tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

Some Google protobuf data types not supporting #10

Closed vijaymandave closed 2 years ago

vijaymandave commented 3 years ago

Hi,

I am using below code to insert record into Kafka table with nuget Kafka.DotNet.ksqlDB version 1.8.0.

Proto message

message kafka_table_order {
string rowkey =1;
uint32 id = 2;
int64 ordertime = 3;
repeated double items = 4;
}

C# code

using Kafka.DotNet.ksqlDB.KSql.Linq;
using Kafka.DotNet.ksqlDB.KSql.Linq.PullQueries;
using Kafka.DotNet.ksqlDB.KSql.Linq.Statements;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.RestApi;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Statements.Properties;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;

public async Task<bool> Write<T>(List<T> models)
{
    var httpClientFactory = new HttpClientFactory(new Uri(_url));
    var restApiClient = new KSqlDbRestApiClient(httpClientFactory);
    var properties = new InsertProperties() { ShouldPluralizeEntityName = false };
    var builder = new StringBuilder();
    foreach (var model in models)
    {
        var bulkStatementSql = restApiClient.ToInsertStatement(model, properties).Sql;
        builder.Append(bulkStatementSql);       
    }

    var bulkStatement = new KSqlDbStatement(builder.ToString());
    var responseMessage = await restApiClient.ExecuteStatementAsync(bulkStatement);
    return responseMessage.IsSuccessStatusCode;
}

Response error:

Getting 400 bad request

SQL query generated :

INSERT INTO kafka_table_order(rowkey,id,ordertime) VALUES('12344', 25, 1633021200);

Issue : In generated SQL query, items not included in INSERT query which is repeated double data type.

tomasfabian commented 3 years ago

Hi @vijaymandave, could you please add an example of your C# POCO class please, too? (e.g.):

public record Kafka_table_order
{
    public string Rowkey { get; set; }
    public int Id { get; set; }
    public long Ordertime { get; set; }
    public IEnumerable<double> Items { get; set; }
}

thx

tomasfabian commented 3 years ago

nuget Kafka.DotNet.ksqlDB version 1.8.0 supports List<T>, T[]

@vijaymandave I released v1.9.0. Could you please verify inserting of enumerables