confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
39 stars 1.04k forks source link

Inconsistent results in pull queries with distributed KsqlDB setup #10241

Open xneg opened 4 months ago

xneg commented 4 months ago

Setup

First of all, we have 6 machines each containing its instance of running in Docker from the image confluentinc/ksqldb-server v 0.29.0.

Second, we have this setup:

listeners=http://0.0.0.0:8088/
ksq.advertised_listener  is set for each node
ksql.heartbeat.enable=true
ksql.streams.num.standby.replicas=1
ksql.query.pull.enable.standby.reads=true
ksql.heartbeat.enable=true

We have a scenario very similar to what is described here.

  1. We have an input topic with 60 partitions. Topic's name is events.
  2. We declared a stream:
    CREATE STREAM EVENTS (EVENT_TYPE STRING, TS STRING) WITH (CLEANUP_POLICY='delete', FORMAT='json', KAFKA_TOPIC='events', TIMESTAMP='ts', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
  3. We defined a table with aggregations like this:
    CREATE TABLE EVENTS_HOURLY_COUNTS AS
    SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
    FROM EVENTS
    WINDOW TUMBLING ( SIZE 1 HOURS )
    GROUP EVENTS.BY ROW_PARTITION
    EMIT CHANGES;

    It created for us 3 topics, 1 visible and 2 hidden.

    Kafka Topic                                                                                             | Partitions | Partition Replicas
    -------------------------------------------------------------------------------------------------------------------------------------------
    EVENTS_HOURLY_COUNTS                                                                                      | 60         | 2
    _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-Aggregate-Materialize-changelog        | 60         | 2
    _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-GroupBy-repartition                    | 60         | 2

The problem

When we issue pull queries for this table it returns us sporadically inconsistent results without any errors in logs. Our queries look like this:

SELECT WINDOWSTART, partition, event_count FROM events_hourly_counts  WHERE WINDOWSTART >= 1708452000000 AND WINDOWEND  <= 1708509600000

We run them against already closed periods so we expect that newly arrived data shouldn't interfere with it. We expect to get data from 60 partitions per hour but sometimes (roughly 1 out of 10) it returns us fewer rows from 44 to 54 and sometimes even 61. My guess is some of the nodes "timeout" and do not return results in our multi-node setup but without any errors in logs, it's hard to investigate further.

If anyone could help somehow or point to the direction where to dig it would be great. Thanks in advance!

xneg commented 4 months ago

Some addition.

I tried to create a table with only one partition:

CREATE TABLE EVENTS_HOURLY_COUNTS WITH (PARTITIONS=1)
AS SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;

So there is only one partition but it still collects the keys from 0 to 59. And it's the same behavior. When I run pull query for 20 hours I expect to receive 1200 rows in results. Most times it is 1200 rows but from time to time it could be 1199, 936 or even 1201 rows!