confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Improve error message for reading from standbys when standby reads are disabled #8088

Open colebaileygit opened 3 years ago

colebaileygit commented 3 years ago

Describe the bug Simple pull query fails when executing on node A but works on node B. (No standby replicas active)

To Reproduce

Expected behavior Both nodes should return the same result (node A calls node B and forwards result to user)

Actual behaviour Node A returns error Exhausted standby hosts to try. but the logs reveal the real error (see context below)

Additional context JSON API response from node B:

[{"header":{"queryId":"query_1630497378619","schema":"`STATUS` STRING KEY, `VERSION` INTEGER"}},
{"row":{"columns":["in-progress",0]}}]

Full error logs:

[2021-09-01 11:52:03,539] ERROR Error while handling chunk (io.confluent.ksql.rest.client.KsqlTarget:312)
io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
    at io.confluent.ksql.rest.client.KsqlTarget.toRows(KsqlTarget.java:454)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:197)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:310)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:232)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:486)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleChunk(Http1xClientConnection.java:322)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$1900(Http1xClientConnection.java:242)
    at io.vertx.core.http.impl.Http1xClientConnection.handleResponseChunk(Http1xClientConnection.java:631)
    at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:601)
    at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["header"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397)
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1719)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3275)
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
    ... 39 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:664)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.slowParseName(UTF8StreamJsonParser.java:1885)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1676)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:758)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:155)
    at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:185)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
    ... 46 more
[2021-09-01 11:52:03,540] WARN Error forwarding query to node http://10.58.62.211:8088/. Falling back to standby state which may return stale results (io.confluent.ksql.physical.pull.HARouting:294)
io.confluent.ksql.rest.client.KsqlRestClientException: Error issuing POST to KSQL server. path:/query
    at io.confluent.ksql.rest.client.KsqlTarget.executeSync(KsqlTarget.java:388)
    at io.confluent.ksql.rest.client.KsqlTarget.executeRequestSync(KsqlTarget.java:305)
    at io.confluent.ksql.rest.client.KsqlTarget.post(KsqlTarget.java:271)
    at io.confluent.ksql.rest.client.KsqlTarget.postQueryRequest(KsqlTarget.java:192)
    at io.confluent.ksql.rest.server.services.DefaultKsqlClient.makeQueryRequest(DefaultKsqlClient.java:129)
    at io.confluent.ksql.physical.pull.HARouting.forwardTo(HARouting.java:331)
    at io.confluent.ksql.physical.pull.HARouting.executeOrRouteQuery(HARouting.java:290)
    at io.confluent.ksql.physical.pull.HARouting.lambda$executeRounds$2(HARouting.java:173)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at io.confluent.ksql.rest.client.KsqlTarget.executeSync(KsqlTarget.java:386)
    ... 11 more
Caused by: io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
    at io.confluent.ksql.rest.client.KsqlTarget.toRows(KsqlTarget.java:454)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:197)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:310)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:232)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:486)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleChunk(Http1xClientConnection.java:322)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$1900(Http1xClientConnection.java:242)
    at io.vertx.core.http.impl.Http1xClientConnection.handleResponseChunk(Http1xClientConnection.java:631)
    at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:601)
    at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["header"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397)
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1719)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3275)
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
    ... 39 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:664)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.slowParseName(UTF8StreamJsonParser.java:1885)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1676)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:758)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:155)
    at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:185)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
    ... 46 more
[2021-09-01 11:52:03,540] ERROR Error while handling chunk (io.confluent.ksql.rest.client.KsqlTarget:312)
io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
    at io.confluent.ksql.rest.client.KsqlTarget.toRows(KsqlTarget.java:454)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:197)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:310)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:232)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:486)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleChunk(Http1xClientConnection.java:322)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$1900(Http1xClientConnection.java:242)
    at io.vertx.core.http.impl.Http1xClientConnection.handleResponseChunk(Http1xClientConnection.java:631)
    at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:601)
    at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queryId': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"queryId":"query_1630497123535","schema":"`STATUS` STRING KEY, `VERSION` INTEGER"}}"; line: 1, column: 9]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3560)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2655)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4356)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4205)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3275)
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
    ... 41 more
[2021-09-01 11:52:03,540] ERROR Exception occurred while writing to connection stream:  (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:124)
io.confluent.ksql.execution.streams.materialization.MaterializationException: Exhausted standby hosts to try.
    at io.confluent.ksql.physical.pull.HARouting.groupByHost(HARouting.java:225)
    at io.confluent.ksql.physical.pull.HARouting.executeRounds(HARouting.java:165)
    at io.confluent.ksql.physical.pull.HARouting.lambda$handlePullQuery$1(HARouting.java:128)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2021-09-01 11:52:09,506] ERROR Error while handling chunk (io.confluent.ksql.rest.client.KsqlTarget:312)
io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
    at io.confluent.ksql.rest.client.KsqlTarget.toRows(KsqlTarget.java:454)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:197)
    at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:310)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:232)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:486)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleChunk(Http1xClientConnection.java:322)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$1900(Http1xClientConnection.java:242)
    at io.vertx.core.http.impl.Http1xClientConnection.handleResponseChunk(Http1xClientConnection.java:631)
    at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:601)
    at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["header"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397)
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1719)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3275)
    at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
    ... 39 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: was expecting closing '"' for name
 at [Source: (byte[])"{"header":{""; line: 1, column: 13]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:664)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.slowParseName(UTF8StreamJsonParser.java:1885)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1676)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:758)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:155)
    at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:185)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
    ... 46 more
colebaileygit commented 3 years ago

Seems to go away if this is disabled: KSQL_KSQL_HEARTBEAT_ENABLE

maksymilian-gasztych commented 3 years ago

I have similar problem when I run ksqldb in a cluster. pull queries from a table like below don't output any results and terminate without results, but if I remove WITH(key_format='JSON') everything works fine.

            CREATE TABLE SUM_A WITH(key_format='JSON')  AS
                SELECT
                    id,
                    SUM(x)
                FROM A
                GROUP BY x
                EMIT CHANGES;
patrickstuedi commented 3 years ago

@colebaileygit did you also observe this on a more recent version of ksqldb or only on 0.19.0? I'm still trying to re-produce it.

colebaileygit commented 3 years ago

@patrickstuedi I have since disabled the KSQL_KSQL_HEARTBEAT_ENABLE setting which made the issue disappear. From memory I was able to reproduce it locally with a docker-compose using 2 ksqldb nodes, but I only tested with 0.19.0

colebaileygit commented 3 years ago

It seems to be the unique combination of KSQL_KSQL_HEARTBEAT_ENABLE: true but no standby replicas setup. If standby replicas >= number of nodes then the error also does not occur.

patrickstuedi commented 3 years ago

I'm confused about the setup. You say you have no standbys set up, but you say the expected behavior is that node A forwards to node B which would only happen if node B is a standby. And the exception indicates that the partition request is forwarded so a standby must be configured. Can you describe your setup, maybe sharing the configs you use for the two nodes?

colebaileygit commented 3 years ago

@patrickstuedi I've prepared a simple repo to reproduce the issue, hopefully this sheds some light on the issue: https://github.com/colebaileygit/ksqldb-demos/tree/master/multi-node-pull-query

Please note that the expected behavior can be produced by changing to KSQL_KSQL_HEARTBEAT_ENABLE: 'false' in the docker compose file.

colebaileygit commented 3 years ago

As a followup, I've also tested a 3-cluster approach with KSQL_KSQL_HEARTBEAT_ENABLE: 'true' and KSQL_KSQL_STREAMS_NUM_STANDBY_REPLICAS: 1 and it also seems to have trouble when query is routed to non-active nodes.

patrickstuedi commented 3 years ago

Can you share/attach the configs you used for each of the servers?

agavra commented 3 years ago

@patrickstuedi it looks like it's here: https://github.com/colebaileygit/ksqldb-demos/blob/master/multi-node-pull-query/docker-compose.yml

patrickstuedi commented 3 years ago

Ah right, @colebaileygit sorry I missed your earlier comment with the link. Thanks @agavra for pointing it out.

patrickstuedi commented 3 years ago

Sorry for the delayed response on this. I ran your setup and could re-produce the error. Then I added a few extra config properties after which both querying the active and the standby succeeded. Here is the modified config I used:

  ksql-1:
    image: confluentinc/ksqldb-server:0.19.0
    ports:
      - "8088:8088"
    depends_on:
      - kafka
    volumes:
      - ./sql:/home/appuser/sql
    environment:
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_SERVICE_ID: ksql-local_
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_KEY: AVRO
      KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_VALUE: AVRO
      KSQL_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KSQL_KSQL_HEARTBEAT_ENABLE: 'true'
      KSQL_KSQL_QUERY_PULL_ENABLE_STANDBY_READS: "true"
      KSQL_KSQL_LAG_REPORTING_ENABLE: "true"
      KSQL_KSQL_STREAMS_NUM_STANDBY_REPLICAS: 1

  ## For testing clustering
  ksql-2:
    image: confluentinc/ksqldb-server:0.19.0
    ports:
      - "8089:8089"
    depends_on:
      - kafka
    volumes:
      - ./sql:/home/appuser/sql
    environment:
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_SERVICE_ID: ksql-local_
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_KEY: AVRO
      KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_VALUE: AVRO
      KSQL_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KSQL_KSQL_HEARTBEAT_ENABLE: 'true'
      KSQL_KSQL_QUERY_PULL_ENABLE_STANDBY_READS: "true"
      KSQL_KSQL_LAG_REPORTING_ENABLE: "true"
      KSQL_KSQL_STREAMS_NUM_STANDBY_REPLICAS: 1

The main change is:

Other changes:

My understanding of the error is that without those properties set, the second node comes up as a standby (because it's configured with the same service id, but because replicas and lag reporting properties are not set the standby is not properly integrated. The error message above indicates that the query is forwarded but then cannot properly be de-serialized. The error message might be because the standby gets the query but expected a heartbeat, or the other way round, that's just a guess and we'll need to figure that out. Clearly the error message is confusing and not particularly helpful.

patrickstuedi commented 2 years ago

I took another look at this issue. I was able to re-produce it on 0.19.0 and also 0.22.0, but the problem seems to have disappeared when using current master.