open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.09k stars 2.38k forks source link

ClickHouse exporter produces duplicates and poor compression without sorting attributes #33634

Open JustinMason opened 5 months ago

JustinMason commented 5 months ago

Component(s)

exporter/clickhouse

Is your feature request related to a problem? Please describe.

The default table created by the exporter isn't a good pattern for optimizing compression and removing duplicates. ClickHouse does not sort the map values, so even though there may be duplicate records the order of their attributes may be different. This causes ClickHouse to treat them as unique records for storage and merge trees. This also effects ClickHouses compression so the same data takes up a lot more disk.

Describe the solution you'd like

We identified this issue and the solution was to use a NULL Engine for the primary table the Exporter writes to, then using a Materialized View we explicitly sort the attributes before insert. mapSort(Attributes) as Attributes,

After this the compression rate for billions of rows was greater than 250, making the storage needed much less. It also eliminated duplicates and helped streamline the increase functions so we could avoid extra processing.

This makes the initial table creation a bit trickier but it is critical in my experience.

Describe alternatives you've considered

No response

Additional context

No response

github-actions[bot] commented 5 months ago

Pinging code owners:

SpencerTorres commented 5 months ago

Interesting information, we actually have a PR open to update the table (#33611)

Considering the complexity of the materialized view required, it might be best to do this in the exporter code. Maps are unpredictable in Go, so we would need to convert it to a slice and sort it. Any thoughts on this approach?

hanjm commented 5 months ago

Yes, map sort can improve compression, clickhouse-go sdk support column.IterableOrderedMap https://github.com/ClickHouse/clickhouse-go/pull/1152, exporter can use this sdk type to write clickhouse map type with order, welcome a PR to try it.

crobert-1 commented 5 months ago

Removing needs triage based on response from code owners.

github-actions[bot] commented 1 month ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

hanjm commented 4 weeks ago

still valid

SpencerTorres commented 2 weeks ago

This can be closed once #35725 is merged

zdyj3170101136 commented 4 days ago

is this is an optimization?

here is my table definition:

CREATE TABLE default.test
(
    `timestamp` DateTime('UTC') CODEC(Delta(4), ZSTD(1)),
    `service` LowCardinality(String) CODEC(ZSTD(1)),
    `operation` LowCardinality(String) CODEC(ZSTD(1)),
    `tags.key` Array(LowCardinality(String)) CODEC(ZSTD(1)),
    `tags.value` Array(String) CODEC(ZSTD(1))
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (-toUnixTimestamp(timestamp), service, operation)
SETTINGS index_granularity = 8192 

 CREATE TABLE default.test_1
(
    `timestamp` DateTime('UTC') CODEC(Delta(4), ZSTD(1)),
    `service` LowCardinality(String) CODEC(ZSTD(1)),
    `operation` LowCardinality(String) CODEC(ZSTD(1)),
    `tags` Map(LowCardinality(String), String) CODEC(ZSTD(1))
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (-toUnixTimestamp(timestamp), service, operation)
SETTINGS index_granularity = 8192

 CREATE TABLE default.test_2
(
    `timestamp` DateTime('UTC') CODEC(Delta(4), ZSTD(1)),
    `service` LowCardinality(String) CODEC(ZSTD(1)),
    `operation` LowCardinality(String) CODEC(ZSTD(1)),
    `tags` Map(LowCardinality(String), String) CODEC(ZSTD(1))
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (-toUnixTimestamp(timestamp), service, operation)
SETTINGS index_granularity = 8192

here is my insert:

insert into test_1 SELECT
                  timestamp, service, operation,
                  mapFromArrays(tags.key, tags.value) AS tags
              FROM test;
insert into test_2 select timestamp, service, operation, mapSort(tags) select * from test_1

seems the compressed bytes is no difference!

SELECT
                  name,
                  type,
                  data_compressed_bytes,
                  data_uncompressed_bytes,
                  (data_compressed_bytes / data_uncompressed_bytes) * 100 AS compression_rate,table
              FROM system.columns
              WHERE table like 'test%' and name like 'tag%'

SELECT
    name,
    type,
    data_compressed_bytes,
    data_uncompressed_bytes,
    (data_compressed_bytes / data_uncompressed_bytes) * 100 AS compression_rate,
    table
FROM system.columns
WHERE (table LIKE 'test%') AND (name LIKE 'tag%')

Query id: c45cc475-1420-41ad-aaf7-ed715cf4e064

┌─name───────┬─type────────────────────────────────┬─data_compressed_bytes─┬─data_uncompressed_bytes─┬──compression_rate─┬─table──┐
│ tags.key   │ Array(LowCardinality(String))       │              14258905 │               365525854 │ 3.900929262311497 │ test   │
│ tags.value │ Array(String)                       │             205625812 │              3444912929 │ 5.968969789308716 │ test   │
│ tags       │ Map(LowCardinality(String), String) │             219946760 │              3810479168 │ 5.772154899758791 │ test_1 │
│ tags       │ Map(LowCardinality(String), String) │             219910517 │              3810470959 │ 5.771216192596627 │ test_2 │
└────────────┴─────────────────────────────────────┴───────────────────────┴─────────────────────────┴───────────────────┴────────┘

clickhouse server version:

ClickHouse server version 23.8.16 revision 54465.

@JustinMason @SpencerTorres cc

earwin commented 4 days ago

@zdyj3170101136 this is both an optimization and correctness issue. E.g. GROUP BY ResourceAttributes will yield duplicate entries for various reorderings of the same set of attributes.

Your example is incomplete without knowing the layout the tags.key has in the original default.test table. Is it already de-facto sorted or not? Also the layout of the tags across the records, when these records are sorted according to (-toUnixTimestamp(timestamp), service, operation). If there are no contiguous runs of the same tagsets, there would be little advantage compression-wise.

Besides compression it is also a question of query efficiency, e.g. otel_metrics_gauge is at the moment ORDER BY (ServiceName, MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix)). Since the Attributes field is part of the sort key, compression won't be affected even if you write randomly-ordered keys to this map. But if you want to query the table by Attributes, you'll need to read more non-contiguous blocks.