confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

NPE when PRINTing topic #3450

Closed rmoff closed 5 years ago

rmoff commented 5 years ago

5.4.0-beta1

ksql> print 'data_mqtt-ccloud-1569933773';
java.lang.NullPointerException

Server log:

[2019-10-01 12:43:58,275] INFO Printing topic 'data_mqtt-ccloud-1569933773' (io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource:217)
[2019-10-01 12:43:58,409] ERROR Exception encountered while writing to output stream (io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter:122)
java.lang.NullPointerException
  at com.fasterxml.jackson.databind.node.ObjectNode.setAll(ObjectNode.java:408)
  at io.confluent.ksql.rest.server.resources.streaming.TopicStream$Format$3$1.print(TopicStream.java:182)
  at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.lambda$format$1(TopicStream.java:76)
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
  at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
  at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
  at java.util.Iterator.forEachRemaining(Iterator.java:116)
  at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
  at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
  at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.format(TopicStream.java:83)
  at io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(TopicStreamWriter.java:100)
  at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:79)
  at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:61)
  at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:266)
  at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:251)
  at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
  at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:109)
  at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
  at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:85)
  at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
  at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1135)
  at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:662)
  at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:395)
  at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:385)
  at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:280)
  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
  at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
  at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
  at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
  at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
  at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
  at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
  at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
  at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
  at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
  at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
  at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)
  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
  at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
  at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)

Sample data:

{"batt":100,"lon":-122.410331480319,"acc":200,"p":100.79250335693359,"bs":2,"vel":0,"vac":10,"lat":37.786102294921875,"t":"u","conn":"w","tst":1569933739,"alt":23,"_type":"location","tid":"RM"}
vcrfxia commented 5 years ago

Attempted to reproduce this but was unsuccessful. Please advise on where I've gone wrong?

Spin up Kafka from CP 5.3.0, create topic and produce record:

$ ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic data_mqtt-ccloud-1569933773
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic data_mqtt-ccloud-1569933773.
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic data_mqtt-ccloud-1569933773
>{"batt":100,"lon":-122.410331480319,"acc":200,"p":100.79250335693359,"bs":2,"vel":0,"vac":10,"lat":37.786102294921875,"t":"u","conn":"w","tst":1569933739,"alt":23,"_type":"location","tid":"RM"}
>^C

Start KSQL server and CLI from master (or 5.4.x-beta, I tried both), issue print topic request:

ksql> print 'data_mqtt-ccloud-1569933773' from beginning;
Format:JSON
{"ROWTIME":1569955768777,"ROWKEY":"null","batt":100,"lon":-122.410331480319,"acc":200,"p":100.7925033569336,"bs":2,"vel":0,"vac":10,"lat":37.786102294921875,"t":"u","conn":"w","tst":1569933739,"alt":23,"_type":"location","tid":"RM"}
^C

and the message comes through as expected.

rmoff commented 5 years ago

¯_(ツ)_/¯ let me know what other debug I can provide. The data sample I've provided is just one of many on the topic.

vcrfxia commented 5 years ago

Are you able to consume data from the topic with kafka-console-consumer? If so, can you share the output?

Also, are you able to print other topics through KSQL? (Would be good to sanity check that the issue is specific to this topic/its data, and not a more general issue.)