tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

Opinionated default auto.offset.reset policy #11

Closed tomasfabian closed 2 years ago

tomasfabian commented 2 years ago

KSqlDbContextOptions should not be opinionated.

KSqlDBContextOptions created with a constructor or by KSqlDbContextOptionsBuilder are setting the auto.offset.reset policy to earliest by default. (Kafka.DotNet.ksqlDB nuget package version: <=1.9.0)

var ksqlDbUrl = @"http:\\localhost:8088";

var contextOptions = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb(ksqlDbUrl)
  .Options;

//or
contextOptions  = new KSqlDBContextOptions(ksqlDbUrl);

Expected behavior The default auto.offset.reset should be the same as is in ksqldb.

This will be a breaking change. In order to get the previous behavior, use the following config:

var contextOptions = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb(ksqlDbUrl)
  .SetAutoOffsetReset(AutoOffsetReset.Earliest)
  .Options;

Subscriptions to KTables shouldn't be affected by this change. Based on testing it changes only the behavior of subscriptions to KStreams:

await using var context = new KSqlDBContext(contextOptions);

var subscription = context.CreateQueryStream<Movie>()
        .Subscribe(onNext: movie =>
        {
          Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
          Console.WriteLine();
        }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));

Environment:

Context: ksqlstreamsautooffsetreset configuration parameter

tomasfabian commented 2 years ago

This issue was fixed in kafka.dotnet.ksqldb 2.0.0.

I'm leaving this issue open as a reminder.

After the above mentioned release the package was renamed to ksqlDB.RestApi.Client 1.0.0.