tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
97 stars 26 forks source link

Generic Pull query with filters #97

Open fsimonovskiii opened 3 weeks ago

fsimonovskiii commented 3 weeks ago

Hi,

I have a use case where I need to reuse a pull query for multiple topics with different types of columns to be filtered by. My requirement is that the same call to Kafka is generic and reused for different types of data. For example I have a materialized view for Sports ("view_sports") which contains records with values

[
    { "SportId": 1, "SportName": "Soccer" },
    { "SportId": 2, "SportName": "Basketball" }
]

And another materialized view for Categories ("view_categories"):

[
    { "SportId": 1, "CategoryId": "1", "CategoryName": "England" },
    { "SportId": 1, "CategoryId": "2", "CategoryName": "Spain" },
    { "SportId": 2, "CategoryId": "3", "CategoryName": "USA" }
]

I am trying to have a pull query which should be a single generic call to kafka, which can query data from both (or more) of these materialized views, and also be able to take corresponding filters for their respective column names. For example, to be able to query all data when needed like the following

SELECT * FROM view_categories;

as well as add different column name parameters as filters (for different topics):

SELECT * FROM view_categories WHERE CategoryId = '1';

or

SELECT * FROM view_sports WHERE Sport = '2';

So basically I would need to be able to return a List from multiple topics and be able to filter the different topics dynamically.

I have tried something like declaring different KSqlDbStatement and hardcoding them, while adding SessionVariables and using a switch to change the statements, like the following:

string categoryId = "1";
string sportId = "1";
KSqlDbStatement statementCategories = new("SELECT * FROM view_categories WHERE CategoryId = ${categoryId};")
{
    SessionVariables = new Dictionary<string, object> { { "categoryId", categoryId } }
};

KSqlDbStatement statementSports = new("SELECT * FROM view_sports WHERE SportId = ${sportId};")
{
    SessionVariables = new Dictionary<string, object> { { "sportId", sportId} }
};

var response = await kSqlDbRestApiClient.ExecuteStatementAsync(statementCategories);

var content = await response.Content.ReadAsStringAsync();

var list = System.Text.Json.JsonSerializer.Deserialize<IEnumerable<JsonElement>>(content);

The following piece of code returns a BadRequest in the response object. Am I maybe sending the variable wrong?

Alternatively, going with the KSqlDbContext object instead of KSqlDbRestApiClient, I saw some examples that show this is possible, however because they are strongly typed this does not fit my use case. Is there a way to define multiple column names which would be hardcoded, but ultimately return a List<object> instead of a strongly typed object?

string sportId = null; // for ex would be null if we do not wanna filter by this column
string categoryId = "1";
string materializedView = "view_categories";

List<object> list = await context.CreatePullQuery<object>(materializedView)
    .Where(sportId == null || "SportId" == sportId) // does not compile, just for clarity what I am trying to achieve
    .Where(categoryId == null || "CategoryId" == categoryId) // does not compile, just for clarity what I am trying to achieve
    .GetManyAsync()
    .ToListAsync();

Any help is appreciated, thanks in advance!

tomasfabian commented 3 weeks ago

Hello @fsimonovskiii,

Why isn’t it feasible for you to switch between different type-safe queries?

context.CreatePullQuery<Category>
context.CreatePullQuery<Sport>

Here’s a workaround you can try in the meantime, but you will need to handle the raw data in the response:

PropertyInfo? propertyInfo = typeof(KSqlDbStatement).GetProperty("EndpointType", BindingFlags.NonPublic | BindingFlags.Instance);

propertyInfo?.SetValue(statementCategories, EndpointType.Query);
fsimonovskiii commented 3 weeks ago

Hello @tomasfabian

Thank you for the quick reply. I like the CreatePullQuery option better too, however due to the nature of this endpoint needing to be dynamic, I would need to define the different models per topic, and then create different contexts' depending on the filters needed. I was thinking of a wat where I could go with just a single definition of the context that would generate a dynamic object.

I was just wondering if there is some easy way to achieve this that I was not seeing clearly.

tomasfabian commented 3 weeks ago

Another option would be to expose a new function for the KSqlDbRestApiClient:

Task<HttpResponseMessage> ExecuteQueryAsync(
        KSqlDbQuery ksqlDbQuery,
        CancellationToken cancellationToken = default (CancellationToken));

Or something like this (just a prototype):

public class QueryResult ExecuteQueryAsync(KSqlDbQuery ksqlDbQuery, CancellationToken cancellationToken = default(CancellationToken));

public class QueryResult : HttpResponseMessage
{
  public IEnumerable<Row> Rows { get; }
}

public record Row : String
{
}

It would be great if you could share both your insights and your approach as well.

tomasfabian commented 3 weeks ago

Or perhaps it would be better to extend the KSqldDbContext, as the KSqlDbRestApiClient is already quite large:

public class QueryResult ExecutePullQueryAsync(KSqlDbQuery ksqlDbQuery, CancellationToken cancellationToken = default(CancellationToken));
fsimonovskiii commented 3 weeks ago

Hey,

The ExecutePullQueryAsync sounds good, it would be good if possibly KSqlDbQuery could take some dynamic parameters, something similar to the Dapper approach. Maybe in this way we would be able to do a rename on the selected columns from the SELECT statement with an AS keyword or so. If this gets implemented as a new method in the future, I will explore this option as a potential refactor, since I do not say any other way to achieve what I want.

As for right now, I will probably up going with what you suggested:

context.CreatePullQuery<Category>
context.CreatePullQuery<Sport>

and make multiple strongly typed methods which should help me with the filters.