tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
97 stars 26 forks source link

How to specify types as `Struct` when using the modelbuilder? #89

Closed mrt181 closed 1 month ago

mrt181 commented 1 month ago

When mapping types to CREATE statements it's necessary to annotate classes with the StructAttribute when that classes type is used as a property type on the stream/table that should be created.

The type-translator checks for that attribute and generates the correct column definition

      else if (!type.IsGenericType && type.TryGetAttribute<StructAttribute>() != null)
      {
        var ksqlProperties = GetProperties(type, escaping);

        ksqlType = $"{KSqlTypes.Struct}<{string.Join(", ", ksqlProperties)}>";
      }
      else if (!type.IsGenericType && (type.IsClass || type.IsStruct()))
      {
        ksqlType = type.Name.ToUpper();
      }

The modelbuiler supports setting Key, Ignore, Decimal and ColumName.


modelBuilder.Entity<Payment>()
  .Property(b => b.Amount)
  .Decimal(precision: 10, scale: 2);

modelBuilder.Entity<Payment>()
  .Property(b => b.Description)
  .HasColumnName("Desc");

modelBuilder.Entity<Account>()
  .HasKey(c => c.Id);

modelBuilder.Entity<Account>()
  .Property(b => b.Secret)
  .Ignore();

How can the modelbuilder be used to create to correct ksql-type definition when StructAttribute is not set?

tomasfabian commented 1 month ago

Hello @mrt181, that's a missing feature in the model builder that needs to be implemented.

tomasfabian commented 1 month ago

Hello @mrt181, could you please confirm my proposal or suggest an alternative way to specify the struct types using the model builder?

private record KeyValuePair
{
  public string Key { get; set; } = null!;
  public byte[] Value { get; set; } = null!;
}

private record Record
{
  public KeyValuePair[] Headers { get; init; } = null!;
}
ModelBuilder builder = new();

builder.Entity<Record>()
        .Property(b => b.Headers)
        .AsStruct();

var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);

var ksql = new StatementGenerator(builder).CreateTable<Record>(creationMetadata, ifNotExists: true);
Console.WriteLine(ksql);

Would result in the following output:

CREATE TABLE IF NOT EXISTS Records (
      Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>>
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3' );
mrt181 commented 1 month ago

Would do it like that too.

This particular example is great for another reason. Let's mark Headers as actual HEADERS

ModelBuilder builder = new();

builder.Entity<Record>()
        .Property(b => b.Headers)
        .AsStruct();

builder.Entity<Record>()
        .Property(b => b.Headers)
        .WithHeaders();

var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);

var ksql = new StatementGenerator(builder).CreateTable<Record>(creationMetadata, ifNotExists: true);
Console.WriteLine(ksql);

Would result in the following output:

CREATE TABLE IF NOT EXISTS Records (
      Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>> HEADERS
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3');

But this causes a Bad Request because when an Insert statement is created where the sql could look like this

INSERT INTO Records (Headers) VALUES (ARRAY[STRUCT(Key:='foo',Value:='ABC')]); -- Value needs to be some byte value

But inserting into a HEADERS column is disallowed (see: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/insert-values/)

To solve this WithHeaders() needs to also mark the property similar to the IgnoreByInsertsAttribute.

And this attribute should also be available in the ModelBuilder.

And the properties on a HEADERS column must be provide in uppercase always and must be named KEY and VALUE.

CREATE TABLE Records (Key STRING PRIMARY KEY, Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>> HEADERS) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3');
--works because they are automatically uppercase

-- with IdentifierEscaping.Always
CREATE TABLE `Records` (`Key` STRING PRIMARY KEY, `Headers` ARRAY<STRUCT<`Key` VARCHAR, `Value` BYTES>> HEADERS) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3');
-- Failed to prepare statement: Invalid type for HEADERS column: expected ARRAY<STRUCT<`KEY` STRING, `VALUE` BYTES>>, got ARRAY<STRUCT<`Key` STRING, `Value` BYTES>>

-- with IdentifierEscaping.Always
CREATE TABLE `Records` (`Key` STRING PRIMARY KEY, `Headers` ARRAY<STRUCT<`KEY` VARCHAR, `VALUE` BYTES>> HEADERS) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3');
-- works

This can be worked around like this

private record KeyValuePair
{
  [JsonPropertyName("KEY")]
  public string Key { get; set; } = null!;
  [JsonPropertyName("VALUE")]
  public byte[] Value { get; set; } = null!;
}
tomasfabian commented 1 month ago

@mrt181, thank you for your great insights!
I've created a new feature proposal, "Specify IgnoreByInserts using the model builder" #90, based on your comments. Please feel free to suggest any improvements for the public API and other aspects. I also created a PR #91 for the new AsStruct feature. Could you please review and test it?

dotnet add package ksqlDb.RestApi.Client --version 6.3.0-rc.1

Thank you in advance!

An alternative approach for uppercasing the Key and Value fields could be achieved using the fluent API:

modelBuilder.Entity<KeyValuePair>()
  .Property(c => c.Key)
  .HasColumnName(nameof(KeyValuePair.Key).ToUpper());

modelBuilder.Entity<KeyValuePair>()
  .Property(c => c.Value)
  .HasColumnName(nameof(KeyValuePair.Value).ToUpper());
tomasfabian commented 1 month ago

ksqlDb.RestApi.Client 6.3.0 is available now:

dotnet add package ksqlDb.RestApi.Client --version 6.3.0