tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

Fetch records synchronously #6

Closed vijaymandave closed 3 years ago

vijaymandave commented 3 years ago

@tomasfabian,

Reference: #5

We have created order table and queryable_order in KSQL like below-

CREATE TABLE order (id INT PRIMARY KEY, eventtime BIGINT, ordertype INT , description VARCHAR) WITH (KAFKA_TOPIC='order', VALUE_FORMAT='PROTOBUF', PARTITIONS = 1);
CREATE TABLE queryable_order AS SELECT * FROM order;

We have written C# function as below-

using System;
using Kafka.DotNet.ksqlDB.KSql.Query;
using System.Reactive.Linq;
using Kafka.DotNet.ksqlDB.KSql.Linq.Statements;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.Linq;
using Kafka.DotNet.ksqlDB.KSql.Linq.PullQueries;

public static async Task<List<OrderData>> GetOrderStreamsAsync()
{

var url = @"http:\localhost:8088";
await using var context = new KSqlDBContext(new KSqlDBContextOptions(url));
var tablename = "queryable_order";
var orderTypes = new List { 1,3 };

Expression<Func<OrderData, bool>> conditions = o => o.OrderTime >= 1630886400 && o.OrderTime <= 1630887401 && orderTypes.Contains(o.OrderType);

    var enumerable = context.CreateQueryStream<model.SensorData>(tablename)
                .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
                                        .Where(conditions)
                                        .ToAsyncEnumerable();
List<OrderData> list = new List<OrderData>();
                await foreach (var item in enumerable.ConfigureAwait(false))
                {
                    Console.WriteLine(item.ToString());
                    list.Add(item);
                } 
return list;
}

public class OrderData: Record
{
public int Id { get; set; }
public long OrderTime { get; set; }
public int OrderType { get; set; }
public string Description { get; set; }
}

After calling var result = GetOrderDataStreamsAsync().Result;

await foreach (var item in enumerable.ConfigureAwait(false)) prints the item, but after foreach loop it doesn't return the List<OrderData>.

tomasfabian commented 3 years ago

Hi @vijaymandave, push queries are "neverending", I think that you have to limit the expected number of results, otherwise the query will wait for more results:

    var enumerable = context.CreateQueryStream<model.SensorData>(tablename)
                .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
                                        .Where(conditions)
                                        .Take(1)
                                        .ToAsyncEnumerable();

Does this solve your issue?

tomasfabian commented 3 years ago

@vijaymandave Another option is to use a Pull query. It depends on the circumstances (your use case).

tomasfabian commented 3 years ago

@vijaymandave I had some time to experiment. Would this be a viable option for you? Since ksqldb version 0.17.0 it is possible to do Table scans. AFAIK it means a full table (topic) scan. Also ksqldb does not support range scans (secondary indexes), so it can harm your performance.

public static async Task<List<OrderData>> GetOrderStreamsAsync()
{
  var ksqlDbUrl = @"http:\\localhost:8088";
  var options = new KSqlDBContextOptions(ksqlDbUrl) { ShouldPluralizeFromItemName = false };
  options.QueryParameters.Properties["ksql.query.pull.table.scan.enabled"] = "true";

  await using var context = new KSqlDBContext(options);
  var tableName = "queryable_order";
  var orderTypes = new List<int> { 1,3 };

  var enumerable = context.CreatePullQuery<OrderData>(tableName)    
    .Where(o => o.EventTime >= 1630886400 && o.EventTime <= 1630887401 && orderTypes.Contains(o.OrderType))
    .GetManyAsync();

  List<OrderData> list = new List<OrderData>();

  await foreach (var item in enumerable.ConfigureAwait(false))
  {
    Console.WriteLine(item.ToString());
    list.Add(item);
  } 

  return list;
}

What do you think? Did you find another solution?

vijaymandave commented 3 years ago

Below Push Query is working for us-

    var enumerable = context.CreateQueryStream<OrderData>(tablename)
                .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
                                        .Where(conditions)
                                        .Take(1)
                                        .ToAsyncEnumerable();

But we want pull query to select particular range records synchronously and you have suggested below for same using GetManyAsync which is not working for us.

var enumerable = context.CreatePullQuery<OrderData>(tableName)    
    .Where(o => o.EventTime >= 1630886400 && o.EventTime <= 1630887401 && orderTypes.Contains(o.OrderType))
    .GetManyAsync();

And we cannot apply await as given in the documentation-

 var result = await context.CreatePullQuery<IoTSensorStats>("avg_sensor_values")
    .Where(c => c.SensorId == "sensor-1")
    .GetAsync();
tomasfabian commented 3 years ago

I published Kafka.DotNet.ksqlDB.1.7.0-rc.1 with IPullable<T>.GetManyAsync.

It is important to also set the following ksqldb parameter ksql.query.pull.table.scan.enabled for GetManyAsync in case of your query:

options.QueryParameters.Properties["ksql.query.pull.table.scan.enabled"] = "true";
vijaymandave commented 3 years ago

Hi @tomasfabian,

Now it is working and sometimes getting below error after project build.

error NU5104: A stable release of a package should not have a prerelease dependency. Either modify the version spec of dependency "Kafka.DotNet.ksqlDB [1.7.0-rc.1, )" or update the version field in the nuspec

Please give us nuget stable release.

Thank you, Vijay

tomasfabian commented 3 years ago

v1.7.0 was released.