metrico / qryn

⭐️ All-in-One Polyglot Observability with OLAP Storage for Logs, Metrics, Traces & Profiles. Drop-in Grafana Cloud replacement compatible with Loki, Prometheus, Tempo, Pyroscope, Opentelemetry, Datadog and beyond :rocket:
https://qryn.dev
GNU Affero General Public License v3.0
1.24k stars 68 forks source link

Excessive data analysis when querying #542

Open KhafRuslan opened 3 months ago

KhafRuslan commented 3 months ago

We have encountered problems on a large amount of data. We tried to parse the query that Qryn makes and we had some questions SQL query:

explain indexes = 1 WITH sel_a AS (select `samples`.`string` as `string`,`samples`.`fingerprint` as `fingerprint`,samples.timestamp_ns as `timestamp_ns` from qryn.samples_v3_dist as `samples` where ((`samples`.`timestamp_ns`   between 1722865286192000000 and 1722865586192000000) and (`samples`.`type` in (0,0))) and (samples.fingerprint IN (select `sel_1`.`fingerprint` from (select `fingerprint` from `qryn`.`time_series_gin` where ((`key` = 'k8s_clusters') and (`val` = 'k8s-omp-test'))) as `sel_1`  inner any  join (select `fingerprint` from `qryn`.`time_series_gin` where ((`key` = 'ClientId') and (`val` = '123'))) as `sel_2` on `sel_1`.`fingerprint` = `sel_2`.`fingerprint`)) order by `timestamp_ns` desc limit 100) select JSONExtractKeysAndValues(time_series.labels, 'String') as `labels`,sel_a.* from sel_a left any join `qryn`.`time_series_dist` AS time_series on `sel_a`.`fingerprint` = time_series.fingerprint order by `labels` desc,`timestamp_ns` desc

The first part is fine, there is filtering by time:

1 | Expression (Projection)
2 | Sorting (Sorting for ORDER BY)
3 | Expression (Before ORDER BY)
4 | Join (JOIN FillRightFirst)
5 | Expression ((Before JOIN + Projection))
6 | Limit (preliminary LIMIT (without OFFSET))
7 | Sorting (Merge sorted streams after aggregation stage for ORDER BY)
8 | Union
9 | Expression
10 | CreatingSets (Create sets before main query execution)
11 | Sorting (Sorting for ORDER BY)
12 | Expression (Before ORDER BY)
13 | ReadFromMergeTree (qryn.samples_v3)
14 | Indexes:
15 | MinMax
16 | Keys:
17 | timestamp_ns
18 | Condition: and((timestamp_ns in (-Inf, 1722865586192000000]), (timestamp_ns in [1722865286192000000, +Inf)))
19 | Parts: 1/53
20 | Granules: 3/126065
21 | Partition
22 | Keys:
23 | toStartOfDay(toDateTime(divide(timestamp_ns, 1000000000)))
24 | Condition: and((toStartOfDay(toDateTime(divide(timestamp_ns, 1000000000))) in (-Inf, 1722805200]), (toStartOfDay(toDateTime(divide(timestamp_ns, 1000000000))) in [1722805200, +Inf)))
25 | Parts: 1/1
26 | Granules: 3/3
27 | PrimaryKey
28 | Keys:
29 | timestamp_ns
30 | Condition: and((timestamp_ns in (-Inf, 1722865586192000000]), (timestamp_ns in [1722865286192000000, +Inf)))
31 | Parts: 1/1
32 | Granules: 1/3

In the second part, we encountered that it does a full database scan:

33 | CreatingSet (Create set for subquery)
34 | Expression ((Projection + Before ORDER BY))
35 | Join (JOIN FillRightFirst)
36 | Expression ((Before JOIN + (Projection + Before ORDER BY)))
37 | ReadFromMergeTree (qryn.time_series_gin)
38 | Indexes:
39 | MinMax
40 | Condition: true
41 | Parts: 30/30
42 | Granules: 189906/189906
43 | Partition
44 | Condition: true
45 | Parts: 30/30
46 | Granules: 189906/189906
47 | PrimaryKey
48 | Keys:
49 | key
50 | val
51 | Condition: and((key in ['k8s_clusters', 'k8s_clusters']), (val in ['k8s-omp-test', 'k8s-omp-test']))
52 | Parts: 30/30
53 | Granules: 14631/189906
54 | Expression ((Joined actions + (Rename joined columns + (Projection + Before ORDER BY))))
55 | ReadFromMergeTree (qryn.time_series_gin)
56 | Indexes:
57 | MinMax
58 | Condition: true
59 | Parts: 30/30
60 | Granules: 189906/189906
61 | Partition
62 | Condition: true
63 | Parts: 30/30
64 | Granules: 189906/189906
65 | PrimaryKey
66 | Keys:
67 | key
68 | val
69 | Condition: and((key in ['ClientId', 'ClientId']), (val in ['123', '123']))
70 | Parts: 28/30
71 | Granules: 28/189906
72 | ReadFromRemote (Read from remote replica)

The third part is similar, analyzing a lot of data

73 | Union
74 | Expression ((Joined actions + (Rename joined columns + ( + (Projection + Before ORDER BY)))))
75 | ReadFromMergeTree (qryn.time_series)
76 | Indexes:
77 | MinMax
78 | Condition: true
79 | Parts: 37/37
80 | Granules: 14630/14630
81 | Partition
82 | Condition: true
83 | Parts: 37/37
84 | Granules: 14630/14630
85 | PrimaryKey
86 | Condition: true
87 | Parts: 37/37
88 | Granules: 14630/14630
89 | Expression (( + ( + )))
90 | ReadFromRemote (Read from remote replica)

Is it obligatory, is there no possibility to make binding also to time or other way of filtering ? Looks like an approach with multiple joins doesn’t work well on big amounts of data.

Denormalization and storing labels data in another format may help. There are some options:

1) Storing labels as Map(LowCardinality(String), String) at the schema otel.otel_logs in this article https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry#querying-the-map-type

2) Look at section “Approach 3: JSON as pairwise arrays” here https://www.propeldata.com/blog/how-to-store-json-in-clickhouse-the-right-way This approach also use Signoz https://signoz.io/docs/userguide/logs_clickhouse_queries/

lmangani commented 2 months ago

Thanks for the suggestion! We're discussing this internally and will attempt some experiments. Updates will follow.

lmangani commented 2 months ago

Thanks for the report and suggestion @KhafRuslan We are implementing some optimizations for this usecase. Updates will follow once ready to re-test!

lmangani commented 2 months ago

@KhafRuslan please let us know if you can retest and confirm if the improvement is noticeable. Thanks!