tomasfabian / Joker

Reactive data changes from SQL server to .NET clients. SqlTableDependency extensions, Joker.OData, Joker.Redis, Joker.MVVM and ksqlDB LINQ provider
MIT License
69 stars 23 forks source link

Upgrading persistent queries #12

Closed tmonte closed 3 years ago

tmonte commented 3 years ago

On the Kafka.DotNet.ksqlDB.KSql.Linq package, are there any plans to support CREATE OR REPLACE STREAM statements when using CreateQueryStream<T>()? I tried to find a way to pass some kind of parameter but couldn't find one. Thanks!

tomasfabian commented 3 years ago

@tmonte thank you for your interesting question! Yes, why not, I/we can add this functionality. Creating a Stream is a little bit more complex task. Could you help me please to design the API?

We should probably separate this from CreateStreamQuery<T>().

I quickly prototyped some code, I will come back to you later. Please think about the bellow mentioned options.

Here is my general-purpose plain string statement execution (proposal):

  public partial interface IKSqlDbRestApiProvider
  {
    Task<HttpResponseMessage> ExecuteStatementAsync(string ksqlStatement);
  }

Type safe approach (the create or replace stream would be generated):

  public partial interface IKSqlDbRestApiProvider
  {
    Task<HttpResponseMessage> CreateStreamAsync<TEntity>(CreateStreamMetadata metadata);
    Task<HttpResponseMessage> CreateOrReplaceStreamAsync<TEntity>(CreateStreamMetadata metadata);
  }

  public class CreateStreamMetadata
  {
    public string KafkaTopic { get; set; } // required field. Convention Entity name?

    public short Partition { get; set; }

    //... more options
  }

Examples:

  public record Movies
  {
    [System.ComponentModel.DataAnnotations.Key]
    public int Id { get; set; } // Id prefix convention?, the key attribute would be optional

    public string Title { get; set; }

    public int Release_Year { get; set; }
  }

    public async Task ExecuteStatement()
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      IKSqlDbRestApiProvider restApiProvider = new KSqlDBContext(ksqlDbUrl);

      string statement = $@"CREATE TABLE {nameof(Movies)} (
        Id INT PRIMARY KEY,
        Title VARCHAR,
        Release_Year INT
      ) WITH (
        KAFKA_TOPIC='{nameof(Movies)}',
        PARTITIONS=1,
        VALUE_FORMAT = 'JSON'
      );";

      var httpResponseMessage = await restApiProvider.ExecuteStatementAsync(statement);
    }

    public async Task TypeSafe()
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      IKSqlDbRestApiProvider restApiProvider = new KSqlDBContext(ksqlDbUrl);

      var createStreamMetadata = new CreateStreamMetadata
      {
        KafkaTopic = "movies_stream"
      };

      var httpResponseMessage = await restApiProvider.CreateOrReplaceStreamAsync<Movies>(createStreamMetadata);
    }

Let me know what do you think. Thank you very much, Tomas.

tmonte commented 3 years ago

Looks great to me. I can look into getting a PR up when I have some time. Thanks!

tomasfabian commented 3 years ago

@tmonte I implemented the general-purpose CreateStatementAsync. If you don't mind I would like to track the progress of bellow mentioned tasks in this thread (issue). In the following days I will start working on CREATE STREAM AS SELECT task. In case that you would like to participate, let me know (update the status please) to avoid duplication of work.

Task Status
CreateStatementAsync - execution of general-purpose string statements RELEASED
CREATE STREAM - fluent API RELEASED
CREATE TABLE - fluent API RELEASED
CREATE STREAM AS SELECT - fluent API RELEASED
CREATE TABLE AS SELECT - fluent API RELEASED

TODO, WIP - WorkInProgress, TOTEST, DONE, RELEASED

Thanks, too.

tomasfabian commented 3 years ago

Hi @tmonte, I prepared the following fluent API for Create tables/streams as selects. What do you think?


public interface IKSqlDBStatementsContext
{
  IWithOrAsClause CreateStreamStatement(string streamName);
  IWithOrAsClause CreateOrReplaceStreamStatement(string streamName);
  IWithOrAsClause CreateTableStatement(string tableName);
  IWithOrAsClause CreateOrReplaceTableStatement(string tableName);
}

private async Task CreateOrReplaceTableStatement(IKSqlDBStatementsContext context)
{
  var creationMetadata = new CreationMetadata
  {
    KafkaTopic = "tweetsByTitle",       
    KeyFormat = SerializationFormats.Json,
    ValueFormat = SerializationFormats.Json,
    Replicas = 1
  };

  var httpResponseMessage = await context.CreateOrReplaceTableStatement(tableName: "TweetsByTitle")
    .With(creationMetadata)
    .As<Movie>()
    .Where(c => c.Id < 3)
    .Select(c => new {c.Title, ReleaseYear = c.Release_Year})
    .PartitionBy(c => c.Title)
    .ExecuteStatementAsync();

  /*
CREATE OR REPLACE TABLE TweetsByTitle
WITH ( KAFKA_TOPIC='tweetsByTitle', KEY_FORMAT='Json', VALUE_FORMAT='Json', REPLICAS='1' )
AS SELECT Title, Release_Year AS ReleaseYear FROM Movies
WHERE Id < 3 PARTITION BY Title EMIT CHANGES;
   */

  var statementResponse = httpResponseMessage.ToStatementResponses();
}

Thanks, Tomas!

tomasfabian commented 3 years ago

Hi @tmonte, could you please review whether everything you would expect regarding upgrading queries was implemented correctly? v0.9 v0.11

Please close this Issue/feature request if everything is Ok or suggest any improvements.

Thank you, Tomas

tmonte commented 3 years ago

Hey @tomasfabian sorry for the late response, it looks great to me! Thanks so much for getting to it so quickly, awesome work on the library! Closing :)

tomasfabian commented 3 years ago

You are welcome @tmonte.

FYI: kafka.dotnet.ksqldb was moved to a separate repository

I started a new project for destructuring the databases (database inside out pattern) so we can push data where it is needed. Please check out the example in Blazor server side project (Kafka.DotNet.ksqlDb.Experimental.sln) and let me know what do you think.

I'm plannig to add also an example for Change data capture from SQL Server.

Thank you, regards Tomas