confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Huge delay untill getting the first byte of responce body with "print 'topic_name'" with ksql rest #3187

Open Vetrenik opened 5 years ago

Vetrenik commented 5 years ago

Got a problem, when sending "print 'topic'" request with KSQL REST API. It takes near a three minutes untill i got hte first byte of response body. Ksql server reports that printing starts immideately. Using ApacheHttpCLient.

` public class KSQL_rest_test { public static JSONObject executeKSQL(String url, List reqHeaders, JSONObject config) throws InterruptedException, IOException { JSONObject res = new JSONObject(); CloseableHttpClient httpClient = HttpClientBuilder.create().build(); CloseableHttpResponse response;

    try {
        HttpPost postRequest;
        if (!config.getString("ksql").contains("select") && !config.getString("ksql").contains("print")) {
            postRequest = new HttpPost(url + "/" + "ksql");
        } else {
            postRequest = new HttpPost(url + "/" + "query");
        }
        reqHeaders.forEach((h) -> {
            postRequest.addHeader(h.gethName(), h.gethValue());
        });
        postRequest.setEntity(new StringEntity(config.toString()));

        response = httpClient.execute(postRequest);
        try {

            HttpEntity ent = response.getEntity();
            System.out.println("Entity is chunked : " + ent.isChunked());
            System.out.println("Entity is streaming : " + ent.isStreaming());

            InputStream is = ent.getContent();
            DataInputStream dis = new DataInputStream(is);

            int i = 0;

            try {

                long start = System.currentTimeMillis();
                System.out.print((char) dis.readByte());
                long finish = System.currentTimeMillis();
                long timeConsumedMillis = finish - start;
                System.out.println(timeConsumedMillis);
            } catch (IOException e) {
                System.err.println(dis.available());
                e.printStackTrace(System.err);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                response.close();
            } catch (IOException ex) {
                Logger.getLogger(KSQL.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

    } catch (IOException e) {
        e.printStackTrace(System.err);
    } finally {
        try {
            httpClient.close();
        } catch (IOException ex) {
            ex.printStackTrace(System.err);
        }
    }

    return res;
}

public static void main(String[] args) throws InterruptedException, MalformedURLException, ProtocolException, IOException {
    List<kafka.restapi.misc.HTTPHeader> baseKSQLRequestReqHeaders = new ArrayList<>();
    baseKSQLRequestReqHeaders.add(new HTTPHeader("Accept", "application/vnd.ksql.v1+json"));
    baseKSQLRequestReqHeaders.add(new HTTPHeader("Content-Type", "application/vnd.ksql.v1+json; charset=UTF-8"));
    JSONObject config = new JSONObject();
    config.put("ksql", "print \'bininfo-countries-connector\';");

    System.out.print(executeKSQL("http://192.168.76.81:8088", baseKSQLRequestReqHeaders, config));

}

}

`

apurvam commented 5 years ago

The vanilla print will only display messages as they are produced. What's the production rate on the topic?

Vetrenik commented 5 years ago

The vanilla print will only display messages as they are produced. What's the production rate on the topic?

6 seconds

And, if i have set limit to print output, i'm getting such result:

Format:AVRO 8/13/19 11:12:01 AM YEKT, null, {"id": 185, "name_eng": "Russian Federation", "name_native": "Российская Федерация", "currency_name": "RUB", "code": 643} 8/13/19 11:12:01 AM YEKT, null, {"id": 1, "name_eng": "Afghanistan", "name_native": null, "currency_name": null, "code": 4}

As you see, result format is neither AVRO nor JSON, how should i parse it without string-by-string cutting of JSON substrings?

OneCricketeer commented 5 years ago

Most string processing libraries have a split operator that accepts a limit of splits.

You have 3 columns here, the last of which is indeed JSON, the value of the deserialized Avro messages

Though, if you're using Java, why not just write a regular consumer?