confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

KSQL Problem while - print <topic> from beginning #2386

Open authorjapps opened 5 years ago

authorjapps commented 5 years ago

Hello, I have hit with this issue while querying KSQL server

Via REST api call - String SerDe

url:http://localhost:8088/query
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "print 'demo-ksql' from beginning;",
    "streamsProperties" : { }
  }
} 

Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Wed, 23 Jan 2019 07:41:41 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}

Via REST api call - KafkaAvro SerDe

url:http://localhost:8088/query
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "print 'demo-ksql' from beginning;",
    "streamsProperties" : { }
  }
} 

Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}

Via KSQL-cli

ksql> print 'demo-ksql' from beginning;
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
ksql> 

Console consumer output

$ docker exec compose_kafka_1  kafka-console-consumer --bootstrap-server localhost:29092 --topic demo-ksql  --from-beginning
8Hello, Created for KSQL demo
8Hello, Created for KSQL demo
Hello World

KSQL-Server log

[2019-01-23 07:41:42,331] INFO 192.168.32.1 - - [23/Jan/2019:07:41:41 +0000] "POST /query HTTP/1.1" 200 84  568 (io.confluent.rest-utils.requests:60)
[2019-01-23 07:45:43,328] INFO Building AST for print 'demo-ksql' from beginning;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-23 07:45:43,352] INFO Printing topic 'demo-ksql' (io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource:132)
[2019-01-23 07:45:43,372] ERROR Exception encountered while writing to output stream (io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter:112)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
    at io.confluent.ksql.rest.server.resources.streaming.TopicStream$Format$2$1.print(TopicStream.java:132)
    at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.lambda$format$1(TopicStream.java:75)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)

Docker File and Tests - To reproduce(in case you want to run locally)

Just run as JUnit (right click and run the these tests), after you bring up the docker.

    //@Ignore ("Issue still exists")
    @Test
    @JsonTestCase("kafka/consume/ksql/WIP_ISSUE_test_ksql_print_records.json")
    public void testKafkaConsume_printTopicRaw() throws Exception {
    }

    //@Ignore ("Issue still exists")
    @Test
    @JsonTestCase("kafka/consume/ksql/WIP_ISSUE_test_ksql_print_records_json.json")
    public void testKafkaConsume_printTopicJson() throws Exception {
    }

Then observe the KSQL server log and IDE console log.

:::Note:::

I will try producing JSON records and stick the logs here when I get chance. Also try with kafkacat as advised by @rmoff .

rmoff commented 5 years ago

Can you explain clearly how these records are being produced onto the topic? I can't see this from your ticket so far. thanks.

authorjapps commented 5 years ago

Just thinking mate, how it is relevant here. It was produced by the Java client, and the console consumer displays the record.

Anyway here attached the producer run-time debug code screen shot-

screen shot 2019-01-24 at 12 07 24 am screen shot 2019-01-24 at 12 07 48 am

Java Producer runtime log is here-

---------------------------------------------------------
kafka.bootstrap.servers - localhost:9092
---------------------------------------------------------
2019-01-24 00:04:37,558 [main] INFO  org.jsmart.zerocode.core.kafka.client.BasicKafkaClient - brokers:localhost:9092, topicName:demo-ksql, operation:produce, requestJson:{"records":[{"key":"1548288277538","value":"Hello Created for KSQL demo"}]}
2019-01-24 00:04:37,600 [main] INFO  org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = zerocode-producer
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-01-24 00:04:37,781 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-01-24 00:04:37,782 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-01-24 00:04:37,846 [main] WARN  org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper - Could not find path '$.recordType' in the request. returned default type 'RAW'.
2019-01-24 00:04:37,856 [main] INFO  org.jsmart.zerocode.core.kafka.send.KafkaSender - Sending record number: 0

2019-01-24 00:04:50,361 [main] INFO  org.jsmart.zerocode.core.kafka.send.KafkaSender - Synchronous Producer sending record - ProducerRecord(topic=demo-ksql, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1548288277538, value=Hello Created for KSQL demo, timestamp=null)
2019-01-24 00:08:41,772 [kafka-producer-network-thread | zerocode-producer] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=zerocode-producer] Error while fetching metadata with correlation id 1 : {demo-ksql=LEADER_NOT_AVAILABLE}
2019-01-24 00:08:41,773 [kafka-producer-network-thread | zerocode-producer] INFO  org.apache.kafka.clients.Metadata - Cluster ID: 1pdIIv-xTbezDU-kUE6eHA
2019-01-24 00:08:41,912 [main] INFO  org.jsmart.zerocode.core.kafka.send.KafkaSender - Record was sent to partition- 0, with offset- 0 
2019-01-24 00:08:41,918 [main] INFO  org.jsmart.zerocode.core.kafka.send.KafkaSender - deliveryDetails- {"status":"Ok","recordMetadata":{"offset":0,"timestamp":1548288521888,"serializedKeySize":13,"serializedValueSize":27,"topicPartition":{"hash":749715182,"partition":0,"topic":"demo-ksql"}}}
2019-01-24 00:08:41,919 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=zerocode-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-01-24 00:08:41,933 [main] INFO  org.jsmart.zerocode.core.runner.StepNotificationHandler - 
***Step PASSED:Print Topic Records via KSQL query->load_kafka
2019-01-24 00:08:41,936 [main] INFO  org.jsmart.zerocode.core.runner.ZeroCodeMultiStepsScenarioRunnerImpl - 
--------- TEST-STEP-CORRELATION-ID: c874d242-d68b-4699-a608-fb86a13976c0 ---------
*requestTimeStamp:2019-01-24T00:04:37.549
step:load_kafka
url:kafka-topic:demo-ksql
method:produce
request:
{
  "records" : [ {
    "key" : "1548288277538",
    "value" : "Hello Created for KSQL demo"
  } ]
} 
--------- TEST-STEP-CORRELATION-ID: c874d242-d68b-4699-a608-fb86a13976c0 ---------
Response:
{
  "status" : "Ok",
  "recordMetadata" : {
    "offset" : 0,
    "timestamp" : 1548288521888,
    "serializedKeySize" : 13,
    "serializedValueSize" : 27,
    "topicPartition" : {
      "hash" : 749715182,
      "partition" : 0,
      "topic" : "demo-ksql"
    }
  }
}
*responseTimeStamp:2019-01-24T00:08:41.927 
*Response delay:244378.0 milli-secs 
---------> Assertion: <----------
{
  "status" : "Ok"
} 
-done-

:::Note here please:::

This time there is no comma in the record(I removed the comma) i.e.

"Hello Created for KSQL demo"

Earlier it was

"Hello, Created for KSQL demo"

and the ksql-cli now shows the record.

ksql> print 'demo-ksql' from beginning;
Format:STRING
1/24/19 12:08:41 AM UTC , 1548288277538 , Hello Created for KSQL demo

Was the comma creating the issue or the SerDe choosen was incorrect?

So far so good 👍.

Now the new issue is

Over the REST call for the same command, see the screen shot below. The REST client goes on Lading... and doesn't return anything.

screen shot 2019-01-24 at 12 28 14 am screen shot 2019-01-24 at 12 16 19 am
apurvam commented 5 years ago

Thanks for the detailed message @authorjapps . This is strange indeed, especially the part about removing the comma causing the right serde to be chosen by print. Which version of KSQL are you using?

Here is the code that selects the format for 'print' on master: https://github.com/confluentinc/ksql/blob/master/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java

The presence of a comma in the string should not matter: the formatter will try to deserialize the value of the message as an avro record, and only choose the avro formatter if the deserialization succeeded.

Are you sure that the topic didn't have a mix of string and avro records?

authorjapps commented 5 years ago

Thanks @apurvam . Thanks for sharing the source code 👍. Looks interesting. That opens of couple of more scenarios for us to handle in our framework.

Are you sure that the topic didn't have a mix of string and avro records?

Ans - I have tested both cases


2)

+  4)Will try fresh validation with the comma in the string  <--- Will come back here

This works well with a comma and without a comma (with String SerDe)

"Hello, Created for KSQL demo" "Hello Created for KSQL demo"

ksql> print 'demo-ksql-2' from beginning; Format:STRING 2/17/19 1:10:22 PM UTC , 1550409022399 , Hello, Created for KSQL demo 2/17/19 1:10:41 PM UTC , 1550409041040 , Hello Created for KSQL demo



Sounds like `String` records here are not recognized as Avro while trying to read via KSQL ClI. With or without comma doesn't seem like relevant even. That's my understanding.

Your below explaination that it will still `deserialize the value of the message as an avro record` is still concerning me. Please advise(there is a chance I am doing something wrong here)

> The presence of a comma in the string should not matter: the formatter will try to deserialize the value of the message as an avro record, and only choose the avro formatter if the deserialization succeeded.
authorjapps commented 5 years ago

When you get a chance, could you please throw some light on the below(reported in the above ticket)

Over the REST call for the same command, see the screen shot below. The REST client goes on Lading... and doesn't return anything.

authorjapps commented 5 years ago

@apurvam , sorry for the late reply. I have updated the details to your earlier reply now on String n Avro record.

Which version of KSQL are you using?

cp-ksql-server:5.1.0

Do you advise using any higher version than this?

I will keep the http(REST) issue separate from this by raising another ticket, as it seems like mixed up with CLI/java-client issue.