confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
125 stars 1.04k forks source link

VertxException "Connection was closed" in StreamedQueryResultImpl after closing Client #7776

Open mikebin opened 3 years ago

mikebin commented 3 years ago

Describe the bug Java client StreamedQueryResultImpl class throws the following exception after executing a synchronous query and subsequently calling Client#close:

Unexpected error while polling: java.lang.Exception: io.vertx.core.VertxException: Connection was closed

This does not actually cause any issues in query execution, but can be confusing for developers, especially since it's logged at ERROR level.

To Reproduce Version: 0.18.0/6.2.0

Example app which reproduces the issue:

package streams;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;

public class SyncQuery {
  public static Logger log = LoggerFactory.getLogger(SyncQuery.class);
  public static String KSQLDB_SERVER_HOST = "localhost";
  public static int KSQLDB_SERVER_HOST_PORT = 8088;

  public static void main(String[] args) throws Exception {
    System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
    io.vertx.core.logging.LoggerFactory.initialise();

    ClientOptions options = ClientOptions.create()
        .setHost(KSQLDB_SERVER_HOST)
        .setPort(KSQLDB_SERVER_HOST_PORT);
    Client client = Client.create(options);

    Map<String, Object> properties = Collections.singletonMap("auto.offset.reset", "earliest");

    try {
      StreamedQueryResult streamedQueryResult = client
          .streamQuery("select * from s emit changes;", properties).get();
      final long TEN_SECONDS = 10 * 1000L;
      long startTime = System.currentTimeMillis();
      while ((System.currentTimeMillis() - startTime) < TEN_SECONDS) {
        Row row = streamedQueryResult.poll(Duration.ofSeconds(2));
        if (Objects.nonNull(row)) {
          System.out.println(row);
        }
      }
      log.info("End query");
    } catch (Exception e) {
      log.error("Exception in query", e);
    } finally {
      log.info("Closing client");
      client.close();
    }
  }
}

Expected behavior Clean shutdown, with no error logs.

Actual behaviour

2021-07-09 00:59:16 INFO  SyncQuery:43 - End query
2021-07-09 00:59:16 INFO  SyncQuery:47 - Closing client
2021-07-09 00:59:16 ERROR StreamedQueryResultImpl:123 - Unexpected error while polling: java.lang.Exception: io.vertx.core.VertxException: Connection was closed

Additional context Looking at the code for that StreamedQueryResultImpl.java, it appears to use an async subscription (PollableSubscriber) internally, even when polling.

I’m assuming what’s happening is the user is closing the ksqlDB Client while the PollableSubscriber is still active, and that leads to the exception. Is there a way to gracefully complete/close the StreamedQueryResult so that the PollableSubscriber terminates before the Client is closed, or is there a different way the code should be written which would trigger a proper automatic cleanup?

I see a method in the base class BufferedPublisher#complete which looks like it could stop the internal subscriber, but not sure it’s expected for end-users to call that method directly.

mikebin commented 3 years ago

Adding the following before closing the Client eliminates the exception, so maybe adding an example to the docs is all that's needed, unless this cleanup can be simplified at the code/API level.

      client.terminatePushQuery(streamedQueryResult.queryID());
      Row row = streamedQueryResult.poll();
      while(row != null) {
        row = streamedQueryResult.poll();
      }