tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

Time types - Getting "Bad Request" from calling CreateStreamAsync / CreateOrReplaceStreamAsync #16

Closed DoubleE1 closed 2 years ago

DoubleE1 commented 2 years ago

Hi, I'm getting 400 status code from calling both CreateStreamAsync and CreateOrReplaceStreamAsync. But ExecuteStatementAsync successfully created the stream.

I'm using the following code to perform the action.

Environment:

Code

EntityCreationMetadata metadata = new EntityCreationMetadata
{
    KafkaTopic = nameof(MyClass),
    Partitions = 1,
    Replicas = 1,
    ValueFormat = SerializationFormats.Json
};

var httpClientFactory = new HttpClientFactory(new Uri(ksqlDbUrl));
var restApiClient = new KSqlDbRestApiClient(httpClientFactory);

var bulkStatement = new KSqlDbStatement("Create Stream MyClasses (UserId INT, DeviceId STRING, Weather STRING, Dt STRING, UnixDt STRING) with (Kafka_topic='MyClass', Value_format='JSON');");
var httpResponseMessage = await restApiClient.ExecuteStatementAsync(bulkStatement); // Successful

//httpResponseMessage = await restApiClient.CreateStreamAsync<MyClass>(metadata); // Bad Request

public class MyClass
{
    public int UserId { get; set; }
    public string DeviceId { get; set; }
    public string Weather { get; set; }
    public DateTime Dt { get; set; }
    public long UnixDt { get { return (long)this.Dt.ToUniversalTime().Subtract(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds; } }
}
DoubleE1 commented 2 years ago

[update] Have tried to remove the last two properties in "MyClass" and rename the class name to something else, and its working fine now. Other Classes is working fine as well. Is there a work around for this issue?

tomasfabian commented 2 years ago

hi @DoubleE1, I'm reopening this issue, because it actually looks like that you have found a bug (missing feature). The underlying data type should be DATE not DATETIME. Time types

I will also try add support for TimeSpan type. These Time types were added in ksqldb v 0.20.0. You can find the list of currently supported types here.

Thank you Tomas.

tomasfabian commented 2 years ago
Hi @DoubleE1. Could you please review the following type mappings? ksql c#
DATE System.DateTime
TIME System.TimeSpan
TIMESTAMP System.DateTimeOffset
public record MyClass
{
  public DateTime Dt { get; set; }
  public TimeSpan Ts { get; set; }
  public DateTimeOffset DtOffset { get; set; }
}
var httpResponseMessage = await restApiClient.CreateStreamAsync<MyClass>(metadata);

Generated statement:

CREATE STREAM MyClasses (
    Dt DATE,
    Ts TIME,
    DtOffset TIMESTAMP
) WITH ( KAFKA_TOPIC='MyClass', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' );
var value = new MyClass
{
  Dt = new DateTime(2021, 4, 1),
  Ts = new TimeSpan(1, 2, 3),
  DtOffset = new DateTimeOffset(2021, 7, 4, 13, 29, 45, 447, TimeSpan.FromHours(4))
};

httpResponseMessage  = await restApiClient.InsertIntoAsync(value);

Generated statement:

INSERT INTO MyClasses (Dt, Ts, DtOffset) VALUES ('2021-04-01', '01:02:03', '2021-07-04T13:29:45.447+04:00');

Output from ksqldb-cli:

print 'MyClass' from beginning;

rowtime: 2021/12/11 10:36:55.678 Z, key: <null>, value: {"DT":18718,"TS":3723000,"DTOFFSET":1625390985447}, partition: 0

DoubleE1 commented 2 years ago

Hi @tomasfabian,

I've already gone through the type mapping you demonstrate above, looks good to me.

Is the fixes going to be available in the next release?

By the way, is there any method available for me to inspect the generated statement from CreateStream/CreateOrReplaceStreamAsync, something similar to ToQueryString method.

Thank you.

tomasfabian commented 2 years ago

@DoubleE1 sure, I published a release candidate with the above mentioned functionalities. Please try it out and let me know if everything is ok:

Install-Package ksqlDb.RestApi.Client -Version 1.5.0-rc.1

NOTE: I noticed that ksqldb REST API doesn't contain the offset in the payload for the TIMESTAMP values. Needs further investigation (possible bug in ksqldb 0.22.0).

For example value '2021-07-04T13:29:45.447+04:00' is resent as '2021-07-04T09:29:45.447'


I also implemented "Time types" for projections (SELECT) and filters (WHERE).

var from = new TimeSpan(11, 15, 0);
var to = new TimeSpan(15, 0, 0);

var query = context.CreateQueryStream<MyClass>()
        .Select(c => new { c.Ts, LocalTs = new TimeSpan(1, 0, 0) })
        .Where(c => c.Ts.Between(from, to));

There are no equivalent methods similar to ToQueryString at the moment (except of ToInsertStatement), but you can inspect the generated statements by enabling logging:

_info: ksqlDb.RestApi.Client[0] Executing command: CREATE OR REPLACE TABLE movies ( title VARCHAR PRIMARY KEY, id INT, release_year INT ) WITH ( KAFKA_TOPIC='movies', PARTITIONS=1, VALUE_FORMAT = 'JSON' ); dbug: ksqlDb.RestApi.Client[0] Command response (OK): [{"@type":"currentStatus","statementText":"CREATE OR REPLACE TABLE MOVIES (TITLE STRING PRIMARY KEY, ID INTEGER, RELEASE_YEAR INTEGER) WITH (KAFKA_TOPIC='movies', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUEFORMAT='JSON');","commandId":"table/MOVIES/create","commandStatus":{"status":"SUCCESS","message":"Table created","queryId":null},"commandSequenceNumber":278,"warnings":[]}]

Is it sufficient for your purposes?

Regards Tomas

DoubleE1 commented 2 years ago

Hi @tomasfabian,

Thanks for mentioning on the possible bug in ksqldb 0.22.0, will take note on that.

I have tested with the published functionalities under -Version 1.5.0-rc.1 with several different sample from my end and all the 3 type mappings mentioned above, everything is working perfectly fine without having any issue at create/replace stream.


Yep, enable logging is good enough for my case, just looking for an easier way to obtain the generated statement without going into ksqldb-cli to DESCRIBE [MyStream] EXTENDED all the time, as well as getting the statement whenever there's 400 error status occurred.

Thank you so much.

tomasfabian commented 2 years ago

@DoubleE1 I found some edge case like DateTime.Now for including local (client side) dates. These will be available in the release version planned on next week.

var from = new TimeSpan(1, 0, 0);
var to = new TimeSpan(22, 0, 0);

query.Select(c => new { c.Ts, to, FromTime = from, DateTime.Now, New = new TimeSpan(1, 0, 0) })

Yet another option that came to my mind is to inspect the Http response messages in the following manner:

using ksqlDB.RestApi.Client.KSql.RestApi.Extensions;

var httpResponseMessage = await restApiClient.CreateStreamAsync<MyClass>(metadata);

string responseContent = await httpResponseMessage.Content.ReadAsStringAsync();
//or
var statementResponses = await httpResponseMessage.ToStatementResponsesAsync();

Thank you very much too for trying out the RC version.

tomasfabian commented 2 years ago

ksqlDb.RestApi.Client -Version 1.5.0 was released