confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

KsqlDB Java API Client hangs after terminatePushQuery() #7559

Open jahner2016 opened 3 years ago

jahner2016 commented 3 years ago

Hi Team,

I have ksqlDB (0.17) installed and use the Java API Client to query streams. When I try to terminate a query using terminatePushQuery(queryId).get() with a valid queryId (returned from streamQuery() call), the following error is logged and the call hangs:

io.vertx.core.json.DecodeException: Failed to decode:Unrecognized token 'Internal': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

It seems that an Internal Server Error is thrown which of course cannot be parsed to JSON.

Kind regards Iggy

vvcephei commented 3 years ago

@cprasad1 to try and repro this

cprasad1 commented 3 years ago

Hello @jahner2016, I can't reproduce this issue on our end on 0.17, 0.18 or master. Can you give us more information about your ksqlDB setup (docker/tarball installation etc) and the queries that you are trying to run?

I have this docker setup for 0.17:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.17.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

and I have a java client app that accepts 3 input rows into my stream pageviews, then terminates the push query and then starts listening for more rows again:

  public static void main(final String[] args) throws ExecutionException, InterruptedException {
    app();
  }

  public static void app() throws ExecutionException, InterruptedException {
    System.out.println("*** cp has started debugging ***");
    ClientOptions options = ClientOptions.create()
            .setHost(KSQLDB_SERVER_HOST)
            .setPort(KSQLDB_SERVER_HOST_PORT);
    Client client = Client.create(options);

    String sql = "CREATE STREAM pageviews (\n" +
            "    page_id BIGINT,\n" +
            "    viewtime BIGINT,\n" +
            "    user_id VARCHAR\n" +
            "  ) WITH (\n" +
            "    KAFKA_TOPIC = 'keyless-pageviews-topic',\n" +
            "    VALUE_FORMAT = 'JSON', partitions=1\n" +
            "  );";
    client.executeStatement(sql).get();

    StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM pageviews EMIT CHANGES;").get();

    for (int i = 0; i < 3; i++) {
      // Block until a new row is available
      Row row = streamedQueryResult.poll();
      if (row != null) {
        System.out.println("Received a row!");
        System.out.println("Row: " + row.values());
        System.out.println("q_id: " + streamedQueryResult.queryID());
      } else {
        System.out.println("Query has ended.");
      }
    }
    System.out.println("terminating push query");
    //client.terminatePushQuery("hello").get();
    client.terminatePushQuery(streamedQueryResult.queryID()).get();
    System.out.println("successfully terminated");

    streamedQueryResult = client.streamQuery("SELECT * FROM pageviews EMIT CHANGES;").get();
    for (int i = 0; i < 3; i++) {
      // Block until a new row is available
      Row row = streamedQueryResult.poll();
      if (row != null) {
        System.out.println("Received a row!");
        System.out.println("Row: " + row.values());
        System.out.println("q_id: " + streamedQueryResult.queryID());
      } else {
        System.out.println("Query has ended.");
      }
    }
    System.out.println("terminating push query");
    //client.terminatePushQuery("hello").get();
    client.terminatePushQuery(streamedQueryResult.queryID()).get();
    System.out.println("successfully terminated");
    client.close();
  }

I don't see the io.vertx.core.json.DecodeException, and the client doesn't hang after terminating the push query. It continues to accept 3 more rows. I can also see the shutdown being called and successfully going through in the logs of the ksqlDB server.

jahner2016 commented 3 years ago

Hi cprasad1,

I have the following docker setup:


version: '2'

services: zookeeper: image: confluentinc/cp-zookeeper:6.1.1 hostname: zookeeper container_name: zookeeper ports:

And I'm working with the standalone example on https://ksqldb.io/quickstart.html (the riderlocations stream).

If I understand your example correctly, you always terminate the query while no longer polling. What happens if you terminate the query while you are still polling?

Kind regards Iggy

jahner2016 commented 3 years ago

I've found out the following problem:

when I start the stream query in one thread and try to terminate the query in another thread I get the following error in the ksqldb logs:

ksqldb-server | [2021-06-10 14:43:40,662] ERROR Failed to handle request -1 /close-query (io.confluent.ksql.api.server.FailureHandler:38) ksqldb-server | java.lang.IllegalStateException: On wrong context or worker ksqldb-server | at io.confluent.ksql.util.VertxUtils.checkContext(VertxUtils.java:38) ksqldb-server | at io.confluent.ksql.api.server.ConnectionQueryManager.checkContext(ConnectionQueryManager.java:59) ksqldb-server | at io.confluent.ksql.api.server.ConnectionQueryManager.access$100(ConnectionQueryManager.java:30) ksqldb-server | at io.confluent.ksql.api.server.ConnectionQueryManager$ConnectionQueries.removeQuery(ConnectionQueryManager.java:79) ksqldb-server | at io.confluent.ksql.api.server.PushQueryHolder.close(PushQueryHolder.java:50) ...

Maybe that helps finding the error.

Kind regards Iggy