confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
78 stars 1.04k forks source link

KSQL displays Avro data as STRING if Schema Registry connection fails #2293

Open rmoff opened 5 years ago

rmoff commented 5 years ago

When PRINTing Avro data KSQL will display it as a STRING with no warning if KSQL server cannot connect to the Schema Registry.

Schema Registry up:

ksql> PRINT 'RECA_DATA' FROM BEGINNING;
Format:AVRO
12/19/18 5:44:11 PM UTC, null, {"SOMEOTHERFIELD": "1.001", "SOMEFLD": "some data", "AFLD": "data", "ANOTHERFLD": "98.6"}
12/19/18 5:44:52 PM UTC, null, {"SOMEOTHERFIELD": "1.001", "SOMEFLD": "some data", "AFLD": "data", "ANOTHERFLD": "98.6"}

Schema Registry down:

ksql> PRINT 'RECA_DATA' FROM BEGINNING;
Format:STRING
12/19/18 5:43:51 PM UTC , NULL , \x00\x00\x00\x00\x15\x02\x0A1.001\x02\x12some data\x02\x08data\x02\x0898.6
12/19/18 5:44:01 PM UTC , NULL , \x00\x00\x00\x00\x15\x02\x0A1.001\x02\x12some data\x02\x08data\x02\x0898.6

KSQL server log:

[2018-12-19 21:21:00,872] INFO stream-client [_confluent-ksql-confluent_rmoff_01query_CSAS_RECA_DATA_0-b9ee610f-31ab-4152-9639-c00ec43cd0ca] State transition from REBALANCING to ERROR (org.apache.kafka.streams.KafkaStreams:257)
[2018-12-19 21:21:00,872] WARN stream-client [_confluent-ksql-confluent_rmoff_01query_CSAS_RECA_DATA_0-b9ee610f-31ab-4152-9639-c00ec43cd0ca] All stream threads have died. The instance will be in error state and should be closed. (org.apache.kafka.streams.KafkaStreams:418)
[2018-12-19 21:21:00,872] INFO stream-thread [_confluent-ksql-confluent_rmoff_01query_CSAS_RECA_DATA_0-b9ee610f-31ab-4152-9639-c00ec43cd0ca-StreamThread-4] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1198)
Exception in thread "_confluent-ksql-confluent_rmoff_01query_CSAS_RECA_DATA_0-b9ee610f-31ab-4152-9639-c00ec43cd0ca-StreamThread-4" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=source_data, partition=0, offset=1968
   at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
   at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
   at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
   at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
   at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing row to topic RECA_DATA using Converter API
Caused by: org.apache.kafka.connect.errors.DataException: RECA_DATA
   at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:78)
   at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:41)
   at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:24)
   at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:38)
   at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:24)
   at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
   at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
   at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
   at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
   at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
   at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
   at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
   at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.UnknownHostException: schema-registry
   at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
   at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
   at java.net.Socket.connect(Socket.java:589)
   at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
   at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
   at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
   at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
   at sun.net.www.http.HttpClient.New(HttpClient.java:339)
   at sun.net.www.http.HttpClient.New(HttpClient.java:357)
…

Expected behaviour: If KSQL identifies that the data is Avro, it should throw an error to the user if the Schema Registry is not reachable. At the moment it silently falls back to STRING which is very confusing for users (especially since serialisation is often a poorly-understood area anyway).

Related: #1553

big-andy-coates commented 4 years ago

It's not possible to look at raw bytes and 'know' that the data is avro. However, we can warn if the SR is configured, but not reachable, when handling a PRINT statement.