tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
97 stars 26 forks source link

How can I use enums on my data types when creating streams / tables? #55

Closed mrt181 closed 9 months ago

mrt181 commented 9 months ago

I deriving code from a json-schema. The schema defines string properties with a few enum values.

These are code generated into csharp enums.

When I create a statement.

 var metadata = new EntityCreationMetadata
    {
      KafkaTopic = "Test",
      EntityName = "Test",
      Partitions = 1,
      Replicas = 1,
      ValueFormat = SerializationFormats.Json,
    };
    var createOrReplaceTableStatement = StatementGenerator.CreateOrReplaceTable<TestClass>(metadata);

I get this output:

CREATE OR REPLACE TABLE Test (
        node STRUCT<Id BIGINT, Name VARCHAR, Type >,
        limits ARRAY<STRUCT<End TIMESTAMP, Max BIGINT, Min BIGINT, Period , Start TIMESTAMP>>,
        direction ,
        end TIMESTAMP,
        id VARCHAR PRIMARY KEY,
        start TIMESTAMP
) WITH ( KAFKA_TOPIC='Test', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' );

All the enum fields do not have a type attached to them. I have attached the Struct annotation to the classes that are object properties in the schema but this does not work for the enums. How can I map enums as STRING / VARCHAR?

mrt181 commented 9 months ago

On the creation side this might do it?

    internal static string KSqlTypeTranslator(Type type)
    ...
    else if (!type.IsGenericType && (type.IsClass || type.IsStruct()))
     {
       str1 = type.Name.ToUpper();
     }
     else if (type.IsEnum)
     {
       str1 = "VARCHAR";
     }
     else
     ...

Treating the column values as strings because they will be strings in the underlying json data. When reading them they need to be parsed back into the enum though.

tomasfabian commented 9 months ago

Hello @mrt181, your modification of the KSqlTypeTranslator method appears to be accurate. Just to confirm, am I correct in understanding that the JSON data will include the enumeration's name rather than its value? For instance, in the example provided, should the JSON data "Kafka" be interpreted as PortType.Kafka?

internal enum PortType
{
  Kafka = 0,
  Snowflake = 1,
}
mrt181 commented 9 months ago

Yes, the schema type is string but reduced to an enumeration of valid values. This is a full example.

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "http://example.org/foo",
  "type": "object",
  "required": [ "id", "portType", "node" ],
  "additionalProperties": false,
  "properties": {
    "id": {
      "type": "string",
      "minLength": 3
    },
    "portType": {
      "type": "string",
      "enum": [ "Kafka", "Snowflake"]
    },
    "node": {
      "type": "object",
      "properties": {
        "left": {
          "type": "string"
        },
        "right": {
          "type": "string"
        }
      }
    }
  }
}

And the code where I manually added the StructAttribute on the Node class and the KeyAttribute on the Id property to get the correct types in the statement (created with quicktype)

// <auto-generated />
//
// To parse this JSON data, add NuGet 'System.Text.Json' then do:
//
//    using Example;
//
//    var test = Test.FromJson(jsonString);
#nullable enable
#pragma warning disable CS8618
#pragma warning disable CS8601
#pragma warning disable CS8603

namespace Example
{
    using System;
    using System.Collections.Generic;

    using System.Text.Json;
    using System.Text.Json.Serialization;
    using System.Globalization;

    public partial class Test
    {
        [JsonPropertyName("id")]
        [JsonConverter(typeof(MinMaxLengthCheckConverter))]
        [Key]
        public virtual string Id { get; set; }

        [JsonPropertyName("node")]
        public virtual Node Node { get; set; }

        [JsonPropertyName("portType")]
        public virtual PortType PortType { get; set; }
    }

    [Struct]
    public partial class Node
    {
        [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
        [JsonPropertyName("left")]
        public virtual string Left { get; set; }

        [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
        [JsonPropertyName("right")]
        public virtual string Right { get; set; }
    }

    public enum PortType { Kafka, Snowflake };

    public partial class Test
    {
        public static Test FromJson(string json) => JsonSerializer.Deserialize<Test>(json, Example.Converter.Settings);
    }

    public static class Serialize
    {
        public static string ToJson(this Test self) => JsonSerializer.Serialize(self, Example.Converter.Settings);
    }

    internal static class Converter
    {
        public static readonly JsonSerializerOptions Settings = new(JsonSerializerDefaults.General)
        {
            Converters =
            {
                PortTypeConverter.Singleton,
                new DateOnlyConverter(),
                new TimeOnlyConverter(),
                IsoDateTimeOffsetConverter.Singleton
            },
        };
    }

    internal class MinMaxLengthCheckConverter : JsonConverter<string>
    {
        public override bool CanConvert(Type t) => t == typeof(string);

        public override string Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
        {
            var value = reader.GetString();
            if (value.Length >= 3)
            {
                return value;
            }
            throw new Exception("Cannot unmarshal type string");
        }

        public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options)
        {
            if (value.Length >= 3)
            {
                JsonSerializer.Serialize(writer, value, options);
                return;
            }
            throw new Exception("Cannot marshal type string");
        }

        public static readonly MinMaxLengthCheckConverter Singleton = new MinMaxLengthCheckConverter();
    }

    internal class PortTypeConverter : JsonConverter<PortType>
    {
        public override bool CanConvert(Type t) => t == typeof(PortType);

        public override PortType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
        {
            var value = reader.GetString();
            switch (value)
            {
                case "Snowflake":
                    return PortType.Snowflake;
                case "Kafka":
                    return PortType.Kafka;
            }
            throw new Exception("Cannot unmarshal type PortType");
        }

        public override void Write(Utf8JsonWriter writer, PortType value, JsonSerializerOptions options)
        {
            switch (value)
            {
                case PortType.Snowflake:
                    JsonSerializer.Serialize(writer, "Snowflake", options);
                    return;
                case PortType.Kafka:
                    JsonSerializer.Serialize(writer, "Kafka", options);
                    return;
            }
            throw new Exception("Cannot marshal type PortType");
        }

        public static readonly PortTypeConverter Singleton = new PortTypeConverter();
    }

    public class DateOnlyConverter : JsonConverter<DateOnly>
    {
        private readonly string serializationFormat;
        public DateOnlyConverter() : this(null) { }

        public DateOnlyConverter(string? serializationFormat)
        {
            this.serializationFormat = serializationFormat ?? "yyyy-MM-dd";
        }

        public override DateOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
        {
            var value = reader.GetString();
            return DateOnly.Parse(value!);
        }

        public override void Write(Utf8JsonWriter writer, DateOnly value, JsonSerializerOptions options)
            => writer.WriteStringValue(value.ToString(serializationFormat));
    }

    public class TimeOnlyConverter : JsonConverter<TimeOnly>
    {
        private readonly string serializationFormat;

        public TimeOnlyConverter() : this(null) { }

        public TimeOnlyConverter(string? serializationFormat)
        {
            this.serializationFormat = serializationFormat ?? "HH:mm:ss.fff";
        }

        public override TimeOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
        {
            var value = reader.GetString();
            return TimeOnly.Parse(value!);
        }

        public override void Write(Utf8JsonWriter writer, TimeOnly value, JsonSerializerOptions options)
            => writer.WriteStringValue(value.ToString(serializationFormat));
    }

    internal class IsoDateTimeOffsetConverter : JsonConverter<DateTimeOffset>
    {
        public override bool CanConvert(Type t) => t == typeof(DateTimeOffset);

        private const string DefaultDateTimeFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.FFFFFFFK";

        private DateTimeStyles _dateTimeStyles = DateTimeStyles.RoundtripKind;
        private string? _dateTimeFormat;
        private CultureInfo? _culture;

        public DateTimeStyles DateTimeStyles
        {
            get => _dateTimeStyles;
            set => _dateTimeStyles = value;
        }

        public string? DateTimeFormat
        {
            get => _dateTimeFormat ?? string.Empty;
            set => _dateTimeFormat = (string.IsNullOrEmpty(value)) ? null : value;
        }

        public CultureInfo Culture
        {
            get => _culture ?? CultureInfo.CurrentCulture;
            set => _culture = value;
        }

        public override void Write(Utf8JsonWriter writer, DateTimeOffset value, JsonSerializerOptions options)
        {
            string text;

            if ((_dateTimeStyles & DateTimeStyles.AdjustToUniversal) == DateTimeStyles.AdjustToUniversal
                || (_dateTimeStyles & DateTimeStyles.AssumeUniversal) == DateTimeStyles.AssumeUniversal)
            {
                value = value.ToUniversalTime();
            }

            text = value.ToString(_dateTimeFormat ?? DefaultDateTimeFormat, Culture);

            writer.WriteStringValue(text);
        }

        public override DateTimeOffset Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
        {
            string? dateText = reader.GetString();

            if (string.IsNullOrEmpty(dateText) == false)
            {
                if (!string.IsNullOrEmpty(_dateTimeFormat))
                {
                    return DateTimeOffset.ParseExact(dateText, _dateTimeFormat, Culture, _dateTimeStyles);
                }
                else
                {
                    return DateTimeOffset.Parse(dateText, Culture, _dateTimeStyles);
                }
            }
            else
            {
                return default(DateTimeOffset);
            }
        }

        public static readonly IsoDateTimeOffsetConverter Singleton = new IsoDateTimeOffsetConverter();
    }
}
#pragma warning restore CS8618
#pragma warning restore CS8601
#pragma warning restore CS8603
tomasfabian commented 9 months ago

Thank you very much for the input @mrt181. Could you try out and review this PR, please?

mrt181 commented 9 months ago

I am looking into it.

I cloned the project into linux/WSL and had to update the project references to fix the build like this: test-csproj.patch

There are two equal project files that only differ by their names, I guess it just works in windows because they are treated as the same thing? ksqlDB.RestApi.Client.csproj ksqlDb.RestApi.Client.csproj

tomasfabian commented 9 months ago

I haven't noticed it thus far. I removed ksqlDB.RestApi.Client.csproj in the feature branch. Thx!

If it's more convenient for you, I can also publish a release candidate on NuGet.

mrt181 commented 9 months ago

I had to fix three tests to use Environment.NewLine parsertests-newline.patch And found flaky tests that start with "WindowedBy", that is they succeed always when I run them individually but not always when I run all tests. The Connector related tests also fail but that's likely because I have different setup because these tests need s running environment and do not use testcontainers for that.

A pre-release candidate would be really great.

tomasfabian commented 9 months ago

dotnet add package ksqlDb.RestApi.Client --version 3.6.0-rc.2

tomasfabian commented 9 months ago

I applied all your changes. Thank you for the feedback so far @mrt181.

mrt181 commented 9 months ago

This works, thank you

tomasfabian commented 9 months ago

Did you mean that the changes could be merged into the main branch, or were you referring to just using the package instead of cloning the repository works?

mrt181 commented 9 months ago

From my pov this can be merged into main

tomasfabian commented 9 months ago

I merged it :) Thank you very much for this enhancement and your improvements!

tomasfabian commented 9 months ago

@mrt181 I've created a Vagrant file tailored for Linux development on a Windows environment. My next step is to adapt the tests for seamless execution across different operating systems.

tomasfabian commented 9 months ago

Hello @mrt181, I resolved the issues with the unit tests to ensure compatibility with Linux systems.