authorjapps / zerocode

A community-developed, free, opensource, automated testing framework for microservices APIs, Kafka(Data Streams) and Load testing. Zerocode Open Source enables you to create, change and maintain your automated test scenarios via simple JSON or YAML files. Visit documentation below:
https://zerocode-tdd.tddfy.com
Apache License 2.0
885 stars 386 forks source link

Kafka - KSQL testing in action #187

Open authorjapps opened 5 years ago

authorjapps commented 5 years ago

Open issue- https://github.com/confluentinc/ksql/issues/2126

rmoff commented 5 years ago

@authorjapps please can you describe what your issue is? And provide details about your environment—versions, deployment approach e.g. Docker or not, etc.

authorjapps commented 5 years ago

Hello @rmoff mate, thanks for coming back on this.

Issue-1

I faced the similar issue while bringing up ksql-cli. See below error message-

$ docker exec -it compose_ksql-cli_1 bash
root@692642cfa0cb:/# ksql

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.1.0, Server v<unknown> located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

**************** ERROR ********************
Remote server address may not be valid.
Address: http://localhost:8088
Error issuing GET to KSQL server. path:/info
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
*******************************************
ksql> 
ksql> list topics;
Error issuing POST to KSQL server. path:ksql
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
ksql> exit
Exiting KSQL.

But at the same time list topics (or show topics), passed if directly queried from ksql-server via http, See the output below ( test steps dsl are here ).

***Step PASSED:Consume via KSQL query->load_kafka
2019-01-17 21:32:40,305 [main] INFO  
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
*requestTimeStamp:2019-01-17T21:32:38.621
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
  "records" : [ {
    "key" : "1547760758613",
    "value" : "Hello, Created for KSQL demo"
  } ]
} 
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
Response:
{
  "status" : "Ok",
  "recordMetadata" : {
    "offset" : 0,
    "timestamp" : 1547760760264,
    "serializedKeySize" : 13,
    "serializedValueSize" : 34,
    "topicPartition" : {
      "hash" : 749715182,
      "partition" : 0,
      "topic" : "demo-ksql"
    }
  }
}
*responseTimeStamp:2019-01-17T21:32:40.298 
*Response delay:1677.0 milli-secs 
---------> Assertion: <----------
{
  "status" : "Ok"
} 
-done-

2019-01-17 21:32:40,306 [main] INFO  org.jsmart.zerocode.core.runner.ZeroCodeMultiStepsScenarioRunnerImpl - 
Overridden the header key:Accept, with value:application/vnd.ksql.v1+json

***Step PASSED:Consume via KSQL query->ksql_show_topics

--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
*requestTimeStamp:2019-01-17T21:32:40.306
step:ksql_show_topics
url:http://localhost:8088/ksql
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "SHOW TOPICS;",
    "streamsProperties" : { }
  }
} 
--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 21:32:40 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "body" : [ {
    "@type" : "kafka_topics",
    "statementText" : "SHOW TOPICS;",
    "topics" : [ {
      "name" : "_schemas",
      "registered" : false,
      "replicaInfo" : [ 1 ],
      "consumerCount" : 0,
      "consumerGroupCount" : 0
    }, {
      "name" : "demo-ksql",
      "registered" : false,
      "replicaInfo" : [ 1 ],
      "consumerCount" : 0,
      "consumerGroupCount" : 0
    } ]
  } ]
}
*responseTimeStamp:2019-01-17T21:32:40.670 
*Response delay:364.0 milli-secs 
---------> Assertion: <----------
{
  "status" : 200,
  "body" : [ {
    "topics.SIZE" : "$GT.0",
    "topics[?(@.name=='demo-ksql')].registered.SIZE" : 1
  } ]
} 
-done-

The schema registry, bootstrap server configs are below-

kafka.bootstrap.servers=localhost:9092
kafka.producer.properties=kafka_servers/kafka_producer_avro.properties
kafka.consumer.properties=kafka_servers/kafka_consumer_avro.properties
# Kafka REST Proxy end point for sending avro messages
web.application.endpoint.host=http://localhost
web.application.endpoint.port=8082
web.application.endpoint.context=
# URL of Kafka KSQL server
kafka-ksql-server-fqdn=http://localhost:8088

The test case is below to run directly.

Note- But it seems like, the issue has been closed at your end, but I haven't has a chance to have a look at it with the fresh instructions you have provided. Will keep you posted here once I rerun.

Issue-2

When I directly do from the ksql-server via http call it throws error.

print 'demo-ksql' from beginning; java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord

Is it because of of non-json content?

See log here:

--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
*requestTimeStamp:2019-01-17T22:01:15.180
step:ksql_print_records
url:http://localhost:8088/query
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "print 'demo-ksql' from beginning;",
    "streamsProperties" : { }
  }
} 
--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}
*responseTimeStamp:2019-01-17T22:01:16.057 
*Response delay:877.0 milli-secs 
---------> Assertion: <----------
{
  "status" : 200,
  "body" : "$NOT.NULL"
} 
-done-

java.lang.RuntimeException: Assertion failed for :- 

[Consume via KSQL query] 
    |
    |
    +---Step --> [ksql_print_records] 

Failures:
--------- 
Assertion path '$.body' with actual value 'null' did not match the expected value 'NOT NULL'

Note- I haven't tried this via ksql-cli, will update you here after trying.

The test step which was executed for the above result is as below-

        {
            "name": "ksql_print_records",
            "url": "${kafka-ksql-server-fqdn}/query",
            "operation": "POST",
            "request": {
                "headers": {
                    "Content-Type": "application/vnd.ksql.v1+json; charset=utf-8",
                    "Accept": "application/vnd.ksql.v1+json"
                },
                "body": {
                    "ksql": "print 'demo-ksql' from beginning;",
                    "streamsProperties": {}
                }
            },
            "assertions": {
                "status": 200,
                "body": "$NOT.NULL"
            }
        }
rmoff commented 5 years ago

Re. the CLI, you need to provide the KSQL Server address for it to connect to. So based on https://github.com/authorjapps/zerocode/blob/master/docker/compose/kafka-schema-registry.yml your test should be invoked ksql http://ksql-server:8088, not just ksql.

For java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord check the KSQL server log; it sounds like there may be something up with the message payload. Delimited (CSV), JSON, and Avro are all supported.

@authorjapps wrote: Thanks a lot. That worked 👍 . Issue-1 sorted.

root@692642cfa0cb:/# ksql http://ksql-server:8088 CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088


ksql> list topics;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups

_schemas | false | 1 | 1 | 0 | 0
demo-ksql | false | 1 | 1 | 1 | 1

authorjapps commented 5 years ago

KSQL server log

But the response status code is 200 in the server

[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73 573 (io.confluent.rest-utils.requests:60)

[2019-01-18 06:16:10,916] INFO Building AST for list topics;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:10,972] INFO 172.31.0.3 - - [18/Jan/2019:06:16:10 +0000] "POST /ksql HTTP/1.1" 200 265  66 (io.confluent.rest-utils.requests:60)
[2019-01-18 06:16:15,522] INFO Building AST for print 'demo-ksql' from beginning;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:15,558] INFO Printing topic 'demo-ksql' (io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource:132)
[2019-01-18 06:16:15,579] ERROR Exception encountered while writing to output stream (io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter:112)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
    at io.confluent.ksql.rest.server.resources.streaming.TopicStream$Format$2$1.print(TopicStream.java:132)
    at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.lambda$format$1(TopicStream.java:75)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.format(TopicStream.java:82)
    at io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(TopicStreamWriter.java:95)
    at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:79)
    at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:61)
    at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:266)
    at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:251)
    at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
    at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:109)
    at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
    at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:85)
    at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
    at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1135)
    at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:662)
    at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:395)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:385)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:280)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
    at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1340)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1242)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
    at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:740)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    at org.eclipse.jetty.server.Server.handle(Server.java:503)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
    at java.lang.Thread.run(Thread.java:748)
[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73  573 (io.confluent.rest-utils.requests:60)

KSQL cli output

root@692642cfa0cb:/# date
Fri Jan 18 06:07:46 UTC 2019
root@692642cfa0cb:/# ksql http://ksql-server:8088

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> list topics;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 1                  | 0         | 0              
 demo-ksql   | false      | 1          | 1                  | 1         | 1              
-----------------------------------------------------------------------------------------
ksql> print 'demo-ksql' from beginning;
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
ksql> 

Kafka console-consumer

$ docker exec compose_kafka_1  kafka-console-consumer --bootstrap-server localhost:29092 --topic demo-ksql  --from-beginning
8Hello, Created for KSQL demo
8Hello, Created for KSQL demo
Hello World

:::Note:::

I am able to consume via the Java client(via JUnit), perfectly fine, see below(last step has the records, it consumed one records because, it has committed till the previous offset)-

***Step PASSED:Unload - consume a message from kafka->load_kafka
2019-01-18 06:21:01,431 [main] INFO  
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
*requestTimeStamp:2019-01-18T06:21:00.808
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
  "records" : [ {
    "key" : "1547792460796",
    "value" : "Hello World"
  } ]
} 
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
Response:
{
  "status" : "Ok",
  "recordMetadata" : {
    "offset" : 3,
    "timestamp" : 1547792461364,
    "serializedKeySize" : 13,
    "serializedValueSize" : 11,
    "topicPartition" : {
      "hash" : 749715182,
      "partition" : 0,
      "topic" : "demo-ksql"
    }
  }
}
*responseTimeStamp:2019-01-18T06:21:01.422 
*Response delay:614.0 milli-secs 
---------> Assertion: <----------
{
  "status" : "Ok"
} 
-done-

2019-01-18 06:21:01,433 [main] INFO  
---------------------------------------------------------
kafka.bootstrap.servers - localhost:9092
---------------------------------------------------------

2019-01-18 06:21:01,456 [main] INFO  org.jsmart.zerocode.core.kafka.receive.KafkaReceiver - 
### Kafka Consumer Effective configs:ConsumerLocalConfigs{recordType='null', fileDumpTo='target/temp/demo.txt', commitAsync=false, commitSync=true, showRecordsConsumed=true, maxNoOfRetryPollsOrTimeouts=5, pollingTime=1000, seek=null}

2019-01-18 06:21:01,467 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumerGroup14
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 2
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper - 
Record Key - 1547792460796 , Record value - Hello World, Record partition - 0, Record offset - 3

2019-01-18 06:21:10,609 [main] INFO  org.jsmart.zerocode.core.runner.StepNotificationHandler - 
***Step PASSED:Unload - consume a message from kafka->onload_kafka

--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
*requestTimeStamp:2019-01-18T06:21:01.433
step:onload_kafka
url:kafka-topic:demo-ksql
method:unload
request:
{ } 
--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
Response:
{
  "records" : [ {
    "topic" : "demo-ksql",
    "partition" : 0,
    "offset" : 3,
    "timestamp" : 1547792461364,
    "timestampType" : "CREATE_TIME",
    "serializedKeySize" : 13,
    "serializedValueSize" : 11,
    "headers" : {
      "headers" : [ ],
      "isReadOnly" : false
    },
    "key" : "1547792460796",
    "value" : "Hello World",
    "leaderEpoch" : {
      "value" : 0
    }
  } ],
  "size" : 1
}
*responseTimeStamp:2019-01-18T06:21:10.607 
*Response delay:9174.0 milli-secs 
---------> Assertion: <----------
{
  "size" : "$GT.0"
} 
-done-

2019-01-18 06:21:10,730 [main] INFO  org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner - 
**FINISHED executing all Steps for [Unload - consume a message from kafka] **.

Steps were:[load_kafka, onload_kafka]

The records were here raw text messages. Please let me know if you want me to try with JSON messages. Happy to share the same workflow logs.

:::Another thing to note:::-

rmoff commented 5 years ago

You can put what you want onto a Kafka topic, and using the Consumer API you can read what you want. KSQL is a higher level abstraction, and as such requires the data to be in a format that it expects, namely, CSV, JSON, or Avro.

However, I can't reproduce the error you're getting with just the data you've shown above

ksql> PRINT 'test' FROM BEGINNING;
Format:STRING
1/18/19 12:22:07 PM UTC , NULL , foo,bar
1/18/19 12:22:29 PM UTC , NULL , Hello world
1/18/19 12:22:58 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:10 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:17 PM UTC , NULL , Hello World

What I would suggest is that you either just put a simple JSON message on the topic, or if you want to debug the current error further, use kafkacat to dump the full message and raise it as a bug over on the KSQL project https://github.com/confluentinc/ksql/issues/new

e.g.

kafkacat -b kafka:29092 -t test -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'

Key (-1 bytes):
Value (7 bytes): foo,bar
Timestamp: 1547814127770        Partition: 0    Offset: 0
--

Key (-1 bytes):
Value (11 bytes): Hello world
Timestamp: 1547814149934        Partition: 0    Offset: 1
--

Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814178652        Partition: 0    Offset: 2
--

Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814190700        Partition: 0    Offset: 3
--

Key (-1 bytes):
Value (11 bytes): Hello World
Timestamp: 1547814197870        Partition: 0    Offset: 4
--
rmoff commented 5 years ago

Feel free to head over to http://cnfl.io/slack and the #ksql channel for more help on this if you want.

authorjapps commented 5 years ago

Thanks mate, it's on the ksql channel now 👍. Let's see if anyone helps us, otherwise, one of our team folks will raise as a bug in the KSQL if it helps in solving the problem.

I was also thinking, in case of an error response from Kakfa-KSQL server over REST call, 200 doesn't sound alright.

Better would 400(+) or 500(+) to differentiate the call was not succeeded.

Response:
{
"status" : 200,
"headers" : {
"Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
"Server" : [ "Jetty(9.4.12.v20180830)" ],
"Content-Type" : [ "application/vnd.ksql.v1+json" ]
},
"rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}

I will try with JSON message and come back here on this.

Thanks for your support 🙏

authorjapps commented 5 years ago

https://github.com/confluentinc/ksql/issues/2386 has been raised now.