tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
94 stars 25 forks source link

Is there a timeout for SubscribeAsync? #17

Closed DoubleE1 closed 2 years ago

DoubleE1 commented 2 years ago

Hi Tomas, I am currently creating a long running service that subscribe to the Stream, but after idling for a period of time (around 30-60 min) without having any new data insert into the stream, the new data that comes afterward will not be receive by the subscriber.

Question:

Code

var query = context.CreateQueryStream<MyClass>()
    .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
    .SubscribeAsync(c => {
        Console.WriteLine($"{nameof(MyClass)}: {c.UserId} - {c.Weather} - {c.DeviceId}");       //not receiving any new data after idling for a period of time 
    },
    onError: ex => { Console.WriteLine($"Exception: {ex.Message}"); },
    onCompleted: () => { Console.WriteLine("Completed"); });
tomasfabian commented 2 years ago

Hi @DoubleE1, I would expect that in case of timeout you should receive an error notification. Could you please help me to investigate this issue further please? I assume that it is not easy to reproduce this issue.

Could you try to use Subscribe instead of SubsribeAsync and also try to change the TargetFramework from netcoreapp3.1 to .NET 5?

Regards Tomas.

tomasfabian commented 2 years ago

Hi @DoubleE1. In case that you (or someone else) would like to help me out with this issue please clone the repository for debugging purposes and reference the ksqlDB.RestApi.Client project in your solution instead of the Nuget package.

I would suggest you to examine the behavior in this class KsqlDbProvider. PR is welcome.

Thank you Tomas

DoubleE1 commented 2 years ago

Hi @tomasfabian

I have tried with .NET 5.0 & 3.1 Core to subscribe to the stream using both Subscribe / SubscribeAsync, but still not getting the timeout exception after reproducing the same step as mentioned above.

I will try to examine on the Class mentioned. Will get back to you if I notice any possible issue.

Thank you.

DoubleE1 commented 2 years ago

Hi @tomasfabian.

After debugging and searching for some time, I notice that with the use of HttpClient, the StreamReader will stop reading from the network stream after idling for some time (Suspect 15min and above). But I can't tell for sure whether this is the main root cause for the issue mentioned above.

I have created a sample project to stimulate the timeout issue. Feel free to take a look. Here's a similar issue found on stackoverflow.

Just wonder:

Currently trying to get the timeout exception, but I still figuring where's the timeout coming from.

tomasfabian commented 2 years ago

@DoubleE1 have you also tried the suggested solution from Stackoverflow, to set the timeouts to infinite?

httpClient.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);

and

stream.ReadTimeout = Timeout.Infinite;
DoubleE1 commented 2 years ago

Hi @tomasfabian, yep I did tried the suggested solution, but I got the exception mentioning that the 'Timeouts are not supported on this stream', as shown in the image below and I haven't look into this just yet.

But it seems like not every Stream implements/support ReadTimeout. Refers to Microsoft's Documentation on Stream.ReadTimeout Property

Exceptions

InvalidOperationException The ReadTimeout method always throws an InvalidOperationException.

Would appreciate if there's any recommended solution for this.

Exception
DoubleE1 commented 2 years ago

Hi @tomasfabian

Unfortunately, I couldn't find any solution to extend the stream timeout nor return the timeout exception from the stream.

Have tried the following method:

Note: The stream itself has its own read and write timeout (15 minutes by default), so if 15 minutes go by without any data being uploaded, the stream will stop reading. (Referring to this post)

tomasfabian commented 2 years ago

Hi @DoubleE1, thank you for your observations. I will try to dig into this issue deeper.

But the solution is questionable for the following reasons. Even if we could return the timeout exception, AFAIK there is no way how to reconnect to the same push query in case of errors or lost connection. I posted a question in Stackoverflow few months ago and it wasn't answered, so it is probably not possible at all. This has to be implemented in ksqldb itself. My understanding is that in case of the Rest API we don't have any of the following processing guarantees:

But I can be wrong because I didn't find any documentation about it. This is just my assumption.

Basically my approach is to use a lower level API and consume the messages from a persistent push query. See an example. In this way you can provide a GroupId and scale your application, configure exactly once processing, etc. So in this moment it is maybe even a better solution, rather than a workaround.

The downside is that you need access to the underlying topic and you have to provide BootstrapServers in the configuration and probably add yet another package such as Confluent.Kafka, if you haven't added it already.

Hopefully this will help you and enable you to proceed further.

tomasfabian commented 2 years ago

This is the only documentation about processing guarantees in ksqldb I'm aware of, but I'm not sure how to interpret it in conjunction with the REST API, since I didn't find a way how to continue with the original processing (see my previous comment).

Let me know what you think, too. Thank you.

PS: in case that anyone can prove me wrong with my assumptions I will be just super happy :)

tomasfabian commented 2 years ago

Hi @DoubleE1, I tried your sample project and alongside it I also ran a simple console application from Visual Studios. Both received a value after more than 3.5 hours.

Output from your project:

_Configuration Result: [Success] Name KafkaTest [Success] ServiceName KafkaTest Topshelf v4.3.0.0, .NET 5.0.13 (5.0.13) The KafkaTest service is now running, press Control+C to exit. CreateStreamAsync: False - BadRequest 04:19:25 - Raw data received: {"queryId":"8a20cf26-2959-4956-a203-c5b9445f961e","columnNames":["TITLE","ID","RELEASEYEAR"],"columnTypes":["STRING","INTEGER","INTEGER"]} 04:19:25 - Raw data received: ["Aliens",1,1986] 04:19:25 - Raw data received: ["Die Hard",2,1998] 04:19:25 - Raw data received: ["3",3,1982] 08:05:54 - Raw data received: ["4",4,1982]

The bottom line is that I was not able to reproduce your issue.

EDIT: I ran the ksqldb from this docker-compose file in Docker Desktop.

DoubleE1 commented 2 years ago

Hi @tomasfabian,

Thank you for trying out the sample project, it seems like the issue wasn't caused by the code itself but may be either the configuration or the setting on the local machine/server (although I have tried with several different machines & servers).

I am currently using a Linux server to host the ksqldb server in Docker. This is the docker-compose.yml file that I'm currently using.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - 8088:8088
    environment:
     KSQL_CONFIG_DIR: "/etc/ksqldb"
     KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties"
     KSQL_BOOTSTRAP_SERVERS: "kafka:29092"
     KSQL_HOST_NAME: ksqldb-server
     KSQL_LISTENERS: "http://0.0.0.0:8088"
    depends_on:
     - kafka
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    entrypoint: /bin/sh
    environment:
     KSQL_CONFIG_DIR: "/etc/ksqldb"
    tty: true
    depends_on:
     - ksqldb-server

Since the code is working just fine after the verification from your end, I think we're good to close this issue. Really appreciate your time and effort to look into this issue, I'll try to see whether I'm able to find out what's the root cause of this issue from my end. Hopefully, I'm able to share my solution over here.

tomasfabian commented 2 years ago

Hi @DoubleE1, have you found a solution for your timeout issue?

Here is a similar discussion with a solution for the Windows platform, maybe it will help someone: TCP Reset after 5mins from Azure Load Balancer

DoubleE1 commented 2 years ago

Hi @tomasfabian,

Unfortunately nop, I didn't manage to resolve the timeout issue, but instead I've changed from using HttpClient to HttpWebRequest to resolve the timeout issue. (Which is a slightly big changes that has been done outside of the package layer)

Regarding the WinHttpHandler which might not be supported on Linux/Linux container, not sure if this stackoverflow answer will help out. Will definitely spend sometime to look into this whenever I'm available.

Thank you very much, really appreciate your help!

tomasfabian commented 2 years ago

In case that someone is facing issues with the HttpClient I redesigned the way how it is injected.. Hopefully it will provide a more stable implementation.

Thanks Tomas.