confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
89 stars 1.04k forks source link

Pull Query Not responding and sometime responding very late.. seems deadlock #9161

Open anupsdtiwari opened 2 years ago

anupsdtiwari commented 2 years ago

Background :- We did set up a new cluster of ksqlDB(0.23.1) from confluent 7.1.1 tarball with 4 nodes of m5.xlarge. Then we started running around 8-10 stream-table queries which resulted in our final state table.

Once we had our table ready and being populated in real time, we started doing a Load Test for 3k / 4k / 5k RPS. Initially for some time it ran fine for 3k with latency under 15ms and for 4k/5k latency was < 100 ms. All good till now.

After this we planned to run 3k load for 10-15 Hours in our Dev Setup and we observed that after 4 hours ksqlDB was not receiving any request and can see HTTP 4XX in our ALB logs. We did some debugging but found no issues at ALB end and when we checked ksqlDB server, we found that even for single pull query with a single key, ksqlDB was not responding and here we tried on all ksqlDB server one by one.All this was happening at night time so when we started debugging more later in day, we found a ticket which is not completely related but given us a thought of testing ksqlDB in single node.

We removed all other nodes and tested our load on a single node(Node1) with 3k and can see that since the Node1 was small it was not giving throughput of 3k so we reduced RPS to 1k which node was handling properly with some latency. Now we started another node(Node2) but hitting load test on the same single node(Node1) like earlier and can see in debug logs that few requests are being handled locally and few are being routed to Node2 which was expected(Refer "Normal Behaviour" Section in logs). After this we stopped sending requests to Node1 and started sending requests to Node2 and can see the same expected behaviour so far so good.

Then we again started sending requests to Node1 and it was able to respond perfectly BUT after that as soon as we started sending requests to Node2 in parallel to Node1, we can see gradual increase of avg latency time for 10-20 sec post which none of the nodes were able to respond to queries and can see no further pull query request to any of the node.Once ksqlDB goes in hanging state, it is not responding to any further pull queries even executed via Cli for a single key.

We tried restarting ksqlDB server on both nodes but now as soon as we start sending pull query requests to both nodes, it is not able to respond to any requests. However we can see in logs that requests are reaching to ksqlDB server(Check "Issue Started" Section in logs). This scenario we are able to reproduce every time now after restart.

Once ksqlDB goes into a hang test, we tried testing pull query via API for a single key to check single key flow in DEBUG logs and can see the same issue(See "Single User Testing" section of logs). Please let us know what is happening here to debug it further and solve it.

DEBUG LOGS:-

-- =============================== Normal Behaviour Started ================================================

[2022-05-28 23:10:42,546] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:10:42,546] DEBUG Handling pull query for key ['1700000']-Optional.empty in partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,546] DEBUG Handling pull query for partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,546] DEBUG Before filtering: Active host HostInfo{host='...7', port=8088} , standby hosts [HostInfo{host='...6', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,546] DEBUG Filtered and ordered hosts: [Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}, Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:10:42,546] DEBUG Query select from STATE_FINAL where stateid = '1700000' ; executed locally at host http://...*7:8088/ at timestamp 1653759642546. (io.confluent.ksql.physical.pull.HARouting:284)

[2022-05-28 23:10:42,546] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:10:42,547] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144)

[2022-05-28 23:10:42,548] DEBUG Handling pull query for key ['40981084']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,548] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,548] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,548] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:10:42,548] DEBUG Query select from STATE_FINAL where stateid = '40981084' ; routed to host http://...*6:8088/ at timestamp 1653759642548. (io.confluent.ksql.physical.pull.HARouting:306)

[2022-05-28 23:10:42,550] DEBUG Handling pull query for key ['84981287']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,550] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,550] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,550] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:10:42,550] DEBUG Query select from STATE_FINAL where stateid = '84981287' ; routed to host http://...*6:8088/ at timestamp 1653759642550. (io.confluent.ksql.physical.pull.HARouting:306)

[2022-05-28 23:10:42,554] DEBUG Handling pull query for key ['8373283']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,555] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,555] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,555] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:10:42,555] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:10:42,555] DEBUG Query select from STATE_FINAL where stateid = '8373283' ; routed to host http://...*6:8088/ at timestamp 1653759642555. (io.confluent.ksql.physical.pull.HARouting:306)

-- =============================== Normal Behaviour Ended ================================================

-- =========================== Issue Started (When i started hitting pull Query API requests on another node ======================

[2022-05-28 23:10:42,555] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 936 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144)

[2022-05-28 23:10:42,555] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:10:42,556] DEBUG Handling pull query for key ['85005993']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,556] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,556] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,557] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)

[2022-05-28 23:10:42,557] INFO ...45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:10:42,558] DEBUG Handling pull query for key ['85035395']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:10:42,558] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:10:42,558] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:10:42,558] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)

-- ========= Single User Testing : curl -H "Content-Type: application/json" -X POST --data @body.json http://*.*.*.*7:8088/query =============

[2022-05-28 23:30:41,411] DEBUG Handling pull query for key ['1280']-Optional.empty in partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:30:41,411] DEBUG Handling pull query for partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:30:41,411] DEBUG Before filtering: Active host HostInfo{host='...7', port=8088} , standby hosts [HostInfo{host='...6', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:30:41,411] DEBUG Filtered and ordered hosts: [Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}, Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:30:41,421] DEBUG Send heartbeat to host ...6:8088 at 1653760841421 (io.confluent.ksql.rest.server.HeartbeatAgent:329) [2022-05-28 23:30:41,485] DEBUG Receive heartbeat at: 1653760841485 from host: ...6:8088 (io.confluent.ksql.rest.server.HeartbeatAgent:118)

[2022-05-28 23:34:58,132] DEBUG Host: ...6:8088 has 0 missing heartbeats (io.confluent.ksql.rest.server.HeartbeatAgent:306) [2022-05-28 23:34:58,185] DEBUG Receive heartbeat at: 1653761098185 from host: ...6:8088 (io.confluent.ksql.rest.server.HeartbeatAgent:118) [2022-05-28 23:34:58,186] INFO ...6 - - [Sat, 28 May 2022 18:04:58 GMT] "POST /heartbeat HTTP/1.1" 200 13 "-" "-" 57 (io.confluent.ksql.api.server.LoggingHandler:144) [2022-05-28 23:34:58,220] DEBUG Handling pull query for key ['84981287']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171) [2022-05-28 23:34:58,221] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127) [2022-05-28 23:34:58,221] DEBUG Before filtering: Active host HostInfo{host='...6', port=8088} , standby hosts [HostInfo{host='...7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347) [2022-05-28 23:34:58,221] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = ...6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = ...7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371) [2022-05-28 23:34:58,221] DEBUG Send heartbeat to host ...6:8088 at 1653761098221 (io.confluent.ksql.rest.server.HeartbeatAgent:329) [2022-05-28 23:34:58,285] DEBUG Receive heartbeat at: 1653761098285 from host: ...6:8088 (io.confluent.ksql.rest.server.HeartbeatAgent:118)

anupsdtiwari commented 2 years ago

-- ============ ksqlDB Server.Properties File ==============

listeners=http://*.*.*7:8088 ksql.extension.dir=/opt/confluent/confluent-7.1.1/etc/ksqldb/ext

------ Logging config -------

ksql.logging.processing.topic.auto.create=true ksql.logging.processing.stream.auto.create=true ksql.logging.processing.rows.include=true

------ External service config -------

bootstrap.servers=...1:9092,...2:9092,..*.3:9092 ksql.service.id=mTest

-------- Additional Prod Settings -----------

ksql.streams.producer.retries=2147483647 ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807 ksql.streams.producer.request.timeout.ms=300000 ksql.streams.producer.max.block.ms=9223372036854775807 ksql.streams.replication.factor=3 ksql.sink.replicas=3 ksql.streams.state.dir=/data/ksql ksql.streams.num.standby.replicas=2 ksql.query.pull.enable.standby.reads=true ksql.heartbeat.enable=true ksql.lag.reporting.enable=true ksql.query.pull.metrics.enabled=true ksql.udf.collect.metrics=true ksql.streams.num.stream.threads=1 ksql.plugins.rocksdb.cache.size=4294967296 ksql.streams.commit.interval.ms=2000

ksql.streams.metrics.recording.level=TRACE

ksql.streams.metrics.recording.level=DEBUG compression.type=snappy

anupsdtiwari commented 2 years ago

One more thing which i observed is, when i stop my load test and after sometime when i stop ksqlDB process then can see below logs related to pull queries and it seems that queries which were in queued due to older queries which got stucked are being replayed and some stuck queries are coming out with error :-

Logs :-

DEBUG Query select from STATE_FINAL where STATEID = '323847' ; routed to host http://..6:8088/ at timestamp 1653919699663. (io.confluent.ksql.physical.pull.HARouting:306) INFO ..6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144) WARN Interrupted while writing to connection stream (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:139) INFO ..6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144) WARN Interrupted while writing to connection stream (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:139

INFO ..*6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144) ERROR Failed to queue all rows (io.confluent.ksql.physical.pull.HARouting:459) ERROR Failed to handle request 500 /query (io.confluent.ksql.api.server.FailureHandler:38) java.lang.IllegalStateException: Worker executor closed at io.vertx.core.impl.WorkerExecutorImpl.executeBlocking(WorkerExecutorImpl.java:56) at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeOnWorker(KsqlServerEndpoints.java:316) at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeOldApiEndpointOnWorker(KsqlServerEndpoints.java:328) at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeQueryRequest(KsqlServerEndpoints.java:193) at io.confluent.ksql.api.server.ServerVerticle.lambda$handleQueryRequest$5(ServerVerticle.java:252) at io.confluent.ksql.api.server.OldApiUtils.handleOldApiRequest(OldApiUtils.java:72) at io.confluent.ksql.api.server.ServerVerticle.handleQueryRequest(ServerVerticle.java:248) at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038) at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:101) at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132) at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:296) at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:276) at io.vertx.ext.web.handler.impl.BodyHandlerImpl.lambda$handle$0(BodyHandlerImpl.java:87) at io.vertx.core.http.impl.HttpServerRequestImpl.onEnd(HttpServerRequestImpl.java:525) at io.vertx.core.http.impl.HttpServerRequestImpl.handleEnd(HttpServerRequestImpl.java:511) at io.vertx.core.http.impl.Http1xServerConnection.handleEnd(Http1xServerConnection.java:176) at io.vertx.core.http.impl.Http1xServerConnection.handleContent(Http1xServerConnection.java:163) at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:140) at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366) at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)

anupsdtiwari commented 2 years ago

Team,

After digging a bit more, it seems a bit related to https://github.com/confluentinc/ksql/pull/8164

Also changing ksql.query.pull.thread.pool.size / ksql.query.pull.router.thread.pool.size to a bit higher number helps queries to come out for lower RPS during load test but still higher RPS are going in Hang state so not sure how to tune it.

I can see a similar issue in ksqlDB 0.25.1. I don't know what is happening and why it is happening so frequently because ideally even with default settings ksqlDB should be able to handle 5k RPS easily.

anupsdtiwari commented 2 years ago

@agavra any update on this issue ? It seems that due to this, we are not able to fully utilise our cluster. I have shared threadDumps in https://github.com/confluentinc/ksql/issues/9331

anupsdtiwari commented 2 years ago

Also i have observed that if i bombard ksqlDB with higher RPS for which it is hitting roof throughput( like for 5k RPS test, it is limiting throughput to 4500) then for such constant rps , GC is happening very quickly and resulting in hanging state of application thread.