confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
5.88k stars 1.04k forks source link

Push queries don't work with slow clients or big data #5384

Open big-andy-coates opened 4 years ago

big-andy-coates commented 4 years ago

Push queries that generate a large output volume, or where clients are consuming the output slowly, cause the Kafka consumer to be evicted from the consumer group.

Internally, push queries take the output from the Streams topology and write it to a blocking queue. Another thread reads from the queue and writes the data to the network socket the client is consuming results from. If the queue is full, the write blocks until the server thread can pop rows from the queue and write them to the Network socket.

The blocking write to the queue can cause the Kafka consumers the Streams topology uses to be evicted from their consumer group. This consumer group instability can cause issues and unnecessary load and network chatter, and sometimes transient queries that don't output anything or stop outputting before their limit is hit.

The logs are also spammed with many lines like this:

[2020-05-18 10:27:03,509] INFO [Consumer clientId=_confluent-ksql-default_transient_3329068293872885753_1589794005729-2dd0c331-af0d-464b-ac10-d74b7db96973-StreamThread-3-consumer, groupId=_confluent-ksql-default_transient_3329068293872885753_1589794005729] Discovered group coordinator localhost:50896 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:814)
[2020-05-18 10:27:03,509] INFO [Consumer clientId=_confluent-ksql-default_transient_3329068293872885753_1589794005729-2dd0c331-af0d-464b-ac10-d74b7db96973-StreamThread-3-consumer, groupId=_confluent-ksql-default_transient_3329068293872885753_1589794005729] Group coordinator localhost:50896 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:866)
big-andy-coates commented 4 years ago

Update: In the latest Vertx version of the server / client the endpoints used by the CLI for push queries, (and probably PRINT), don't apply any backpressure. The rows output by the Streams topology are queued against the response. This looks to be unbounded, though I'm no vertx expert. If it is, this is a problem as a slow client or a query that returns lots of data, can result in serious chunks of memory being allocated for data.

@purplefox Can you confirm if this unbounded? Was this intentional?

Previously, there was a blocking queue of a limited size used to limit the number of processed rows in memory waiting for the client to consume them. The blocking queue is still there, it just no longer limits anything.

purplefox commented 4 years ago

I have fixed the lack of back pressure issue here https://github.com/confluentinc/ksql/pull/5386