tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

ConsumeAsync is not responding on CancellationToken (while waiting on EndOfStream / ReadLineAsync) #25

Closed ArnoRienstra closed 2 years ago

ArnoRienstra commented 2 years ago

Describe the bug KSQL-queries are not removed from KSQL if nothing received after cancelling. CreateQueryStream is calling ConsumeAsync, which is not responding on CancellationToken (while waiting on EndOfStream or ReadLineAsync)

To Reproduce After creating a push query using a filter on a certain ID, the ksql query is not removed from ksql if no new information is streamed. In our case, if a device is stopped or removed, there is no new record published and as such the library is waiting in ReadLineAsync. After initiating a new record the EndOfstream is continued and cancellation performed thereafter and as such ksql-query cancelled in ksql.

Expected behavior Perhaps exception is to be raised on thread from cancellationtoken, being handled by the ConsumeAsync? When .NET7 is released, use the cancellationtoken in the ReadLineAsync?

Environment (please complete the following information):

tomasfabian commented 2 years ago

Hi @ArnoRienstra, this is really unfortunate. Hopefully the .NET team will add a EndOfStreamAsync + ReadLineAsync which can accept a CancellationToken.

Could you help me to find a suitable solution, please? What do you think about registering an action for the cancellationToken? In the following way the while loop is exited either based on the streamReader.EndOfStream or by releasing the semaphoreSlim:

public async IAsyncEnumerable<T> Run<T>(object parameters, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
  logger?.LogInformation($"Executing query {parameters}");

  using var streamReader = await TryGetStreamReaderAsync<T>(parameters, cancellationToken).ConfigureAwait(false);

  using var semaphoreSlim = new SemaphoreSlim(0, 1);

  cancellationToken.Register(() =>
  {
    semaphoreSlim.Release();
  });

  await foreach (var entity in ConsumeAsync<T>(streamReader, semaphoreSlim, cancellationToken).WithCancellation(cancellationToken).ConfigureAwait(false))
    yield return entity;
}

private Task<bool> EndOfStreamAsync(StreamReader streamReader, CancellationToken cancellationToken)
{
  return Task.Run(() =>
  {
    try
    {
      return !streamReader.EndOfStream;
    }
    catch (Exception)
    {
      if (!cancellationToken.IsCancellationRequested)
        throw;

      return true;
    }

  }, cancellationToken);
}

private async IAsyncEnumerable<T> ConsumeAsync<T>(StreamReader streamReader, SemaphoreSlim semaphoreSlim, [EnumeratorCancellation] CancellationToken cancellationToken)
{
  if (cancellationToken.IsCancellationRequested)
    yield break;

  var cancellationTask = Task.Run(async () =>
  {
    await semaphoreSlim.WaitAsync().ConfigureAwait(false);
    return false;
  }, cancellationToken);

  while (await await Task.WhenAny(EndOfStreamAsync(streamReader, cancellationToken), cancellationTask).ConfigureAwait(false))
  {
    if (cancellationToken.IsCancellationRequested)
      yield break;

    var rawData = await streamReader.ReadLineAsync()
      .ConfigureAwait(false);

    logger?.LogDebug($"Raw data received: {rawData}");

    var record = OnLineRead<T>(rawData);

    if (record != null) yield return record.Value;
  }
}

Could you evaluate/test the above solution, please? Is streamReader.ReadLineAsync() still blocking in your case? If you can come up with a better solution please let me know.

Thank you! Regards Tomas.

ArnoRienstra commented 2 years ago

Hello Tomas,

Thank you for the swift response!

This seems to do the job and we can not think of a cleaner solution while .net framework does not support this in ReadLineAsync natively. We'll look out for the next package update,

Thanks again,

Regards, Arno

tomasfabian commented 2 years ago

Hi @ArnoRienstra, I published a new package version 2.1.2 with the fix.

Let me know whether everything is OK.

Thanks too.

Regards Tomas.

ArnoRienstra commented 2 years ago

Hello Tomas,

Thanks for your effort updating the package,

We updated to this release for closing the issue but there seems to be an issue with the .NET version. Now the .NET version 6 is included in the project file, then the conditional statement in KSqlDbQueryStreamProvider should be changed from #if NET5_0 to #if NET5_0_OR_GREATER. This appears to be a breaking change for us and perhaps other users.

https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/blob/290c354556c04888e36066da64bc92fd5ab4e58f/ksqlDb.RestApi.Client/KSql/RestApi/KSqlDbQueryStreamProvider.cs#L68

Regards, Arno

tomasfabian commented 2 years ago

I included the net6.0 inadvertently. Would it be ok for you if I would remove it an unlist the 2.1.2 package version from Nuget?

I didn't want to introduce a breaking change, sorry!

I will replace the NET5_0 with NET5_0_OR_GREATER for the future.

ArnoRienstra commented 2 years ago

Yes please, no problem!

tomasfabian commented 2 years ago

Please try it again with version 2.1.3. I apologize for the inconvenience.

ArnoRienstra commented 2 years ago

It's working as expected now, thanks!