tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

Multiple push queries #72

Closed jbkuczma closed 5 months ago

jbkuczma commented 5 months ago

I am writing a GraphQL app where a subscription feeds events from multiple push queries into a single stream for a web app to consume. Below is an example of the code


        // dependency injection
        services.AddDbContext<IMyKSqlDbContext, MyKSqlDbContext>(options =>
        {
            var builder = options
                .UseKSqlDb("endpoint")
                .SetBasicAuthCredentials("apiKey", "apiSecret")
                .SetupQueryStream(opt =>
                {
                    opt.AutoOffsetReset = AutoOffsetReset.Latest;
                    opt.Properties[KSqlDbConfigs.KsqlQueryPushV2Enabled] = "true";
                    opt.Properties[KSqlDbConfigs.KsqlQueryPushV2ContinuationTokensEnabled] = "true";
                });

            builder.Options.ShouldPluralizeFromItemName = false;
            builder.Options.DisposeHttpClient = true;
            builder.Options.IdentifierEscaping = IdentifierEscaping.Always;
        }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);

        // example usage
        var input = ctx.ArgumentValue<MyGraphQLInput>("input");
        var ksqlClient = ctx.Service<IMyKSqlDbContext>();
        var date = new DateOnly(input.Date.Year, input.Date.Month, input.Date.Day).ToString("O");

        var pushQueryOne = ksqlClient.CreateQueryStream<ModelOne>("MATERIALIZED_VIEW_ONE").Where(x => x.Date == date).ToAsyncEnumerable();
        var pushQueryTwo = ksqlClient.CreateQueryStream<ModelTwo>("MATERIALIZED_VIEW_TWO").Where(x => x.Date == date).ToAsyncEnumerable();

        return AsyncEnumerableEx.Merge<KSqlDbModel>(pushQueryOne, pushQueryTwo);

When looking at the console I see two requests executed by the KSqlDb client but they both use the latest query which eventually leads to a deserialization error since the models for each view differ.

info: ksqlDb.RestApi.Client[0]
      Executing query Sql: SELECT * FROM `MATERIALIZED_VIEW_ONE` WHERE (`Date` = '2024-04-08') EMIT CHANGES;
      Parameters:
      auto.offset.reset = latest
      ksql.query.push.v2.enabled = true
      ksql.query.push.v2.continuation.tokens.enabled = true

info: ksqlDb.RestApi.Client[0]
      Executing query Sql:SELECT * FROM `MATERIALIZED_VIEW_ONE` WHERE (`Date` = '2024-04-08') EMIT CHANGES;
      Parameters:
      auto.offset.reset = latest
      ksql.query.push.v2.enabled = true
      ksql.query.push.v2.continuation.tokens.enabled = true

dbug: ksqlDb.RestApi.Client[0]
      Raw data received: {"queryId":"transient_MATERIALIZED_VIEW_ONE_8554443953915907599","columnNames":["Id", "Date","EventType","Timestamp"],"columnTypes":["STRING","STRING","STRING","STRING"]}

dbug: ksqlDb.RestApi.Client[0]
      Raw data received: {"queryId":"transient_MATERIALIZED_VIEW_ONE_7088848852469522776","columnNames":["Id", "Date","EventType","Timestamp"],"columnTypes":["STRING","STRING","STRING","STRING"]}

I would expect to see two requests and each request sends its intended query.

tomasfabian commented 5 months ago

Hello @jbkuczma, I published a hot-fix:

dotnet add package ksqlDb.RestApi.Client --version 4.0.2
jbkuczma commented 5 months ago

My issue appears to be fixed with that fix. Thank you for taking a look at this so quickly!