apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.6k stars 1.01k forks source link

OLAP likewise rollup during segment merge process [LUCENE-10427] #11463

Open asfimport opened 2 years ago

asfimport commented 2 years ago

Currently, many OLAP engines support rollup feature like clickhouse(AggregateMergeTree)/druid. 

Rollup definition: https://athena.ecs.csus.edu/\~mei/olap/OLAPoperations.php

One of the way to do rollup is to merge the same dimension buckets into one and do sum()/min()/max() operation on metric fields during segment compact/merge process. This can significantly reduce the size of the data and speed up the query a lot.

 

Abstraction of how to do

  1. Define rollup logic: which is dimensions and metrics.
  2. Rollup definition for each metric field: max/min/sum ...
  3. index sorting should the the same as dimension fields.
  4. We will do rollup calculation during segment merge just like other OLAP engine do.

 

Assume the scenario

We use ES to ingest realtime raw temperature data every minutes of each sensor device along with many dimension information. User may want to query the data like "what is the max temperature of some device within some/latest hour" or "what is the max temperature of some city within some/latest hour"

In that way, we can define such fields and rollup definition:

  1. event_hour(round to hour granularity)
  2. device_id(dimension)
  3. city_id(dimension)
  4. temperature(metrics, max/min rollup logic)

The raw data will periodically be rolled up to the hour granularity during segment merge process, which should save 60x storage ideally in the end.

 

How we do rollup in segment merge

bucket: docs should belong to the same bucket if the dimension values are all the same.

  1. For docvalues merge, we send the normal mappedDocId if we encounter a new bucket in DocIDMerger.
  2. Since the index sorting fields are the same with dimension fields. if we encounter more docs in the same bucket, We emit special mappedDocId from DocIDMerger .
  3. In DocValuesConsumer.mergeNumericField, if we meet special mappedDocId, we do a rollup calculation on metric fields and fold the result value to the first doc in the  bucket. The calculation just like a streaming merge sort rollup.
  4. We discard all the special mappedDocId docs because the metrics is already folded to the first doc of in the bucket.
  5. In BKD/posting structure, we discard all the special mappedDocId docs and only place the first doc id within a bucket in the BKD/posting data. It should be simple.

 

How to define the logic

 

public class RollupMergeConfig {
  private List<String> dimensionNames;
  private List<RollupMergeAggregateField> aggregateFields;
} 

public class RollupMergeAggregateField {
  private String name;
  private RollupMergeAggregateType aggregateType;
}

public enum RollupMergeAggregateType {
  COUNT,
  SUM,
  MIN,
  MAX,
  CARDINALITY // if data sketch is stored in binary doc values, we can do a union logic 
}

 

 

I have written the initial code in a basic level. I can submit the complete PR if you think this feature is good to try.


Migrated from LUCENE-10427 by Suhan Mao, updated Mar 29 2022

asfimport commented 2 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

I know that the Elasticsearch team is looking into doing things like that, but on top of Lucene by creating another index that has a different granularity instead of having different granularities within the same index and relying on background merges for rollups.

At first sight, doing it within the same index feels a bit scary to me:

What would you think about doing it on top of Lucene instead, e.g. similarly to how the faceting module maintains a side-car taxonomy index, maybe one could maintain a side-car rollup index to speed up aggregations?

asfimport commented 2 years ago

Suhan Mao (migrated from JIRA)

@jpountz Thanks for your reply!

As I know, the current rollup implementation in ES is to periodically run a composite aggregation to query the aggregated result and insert into another index.

But this approach has several disadvantages:

  1. It still need to save the detailed data which may not be needed by the user if they only want to run aggregate queries. This is what clickhouse aggregate merge tree or druid does currently, they only store aggregated data. But as you mentioned, we can create a sidecar index besides the raw data index, I think it is also acceptable in the first step.
  2. Composite aggregation will cause OOM problem and it maybe very slow if the data volume is very big. For example, if the query granularity is 1h, composite aggregation will extract all the buckets within one hour from the raw data.
  3. Cronjob-like scheduled queries cannot handle late arriving data, if the data belonging to previous time interval arrives late after the query, it will be ignored in the rollup index which will cause data accuracy issue.
  4. From resource consumption perspective, if we must do a merge on segments, why not do rollup in the process of merge within one IO round?
  5. If the ES rollup granularity is one hour, the latest 1 hour data is not visible in the rollup index because the hourly scheduled composite aggregate query is not started yet.

To answer your questions

How we can start from scratch

I think we can start from a sidecar solution first. Assume that index A is the index storing raw data. And index A' is a sidecar index which is a continuous rolling up index.

Assume that the schema of index A is:

d0 time, d1 long, d2 keyword, m1 long, m2 long, m3 binary(hll),x1,x2,x3 ......

x1, x2 and x3 fields are no related to rollup and they are just additional normal fields.

d0 is the event time, d1 and d2 are all dimensions and m1, m2 and m3 are all metrics.

If we want to rollup the data to hourly granularity, we can create a rollup sidecar index A' which only contains d0, d1, d2, m1, m2, m3 fields  and do rollup during merge process. User can submit query to A or A' accordingly.

What's more, we can create several rollup indices which is often called "materialized view" in OLAP scenarios.

For example, if we need another view that only store d0, d1, m3 and rollup granularity is daily, we can create an additional sidecar index A''.

User only need to write raw data once to index A and all the rollup calculation is performed in the internal of lucene. User should submit query to different level of indices accordingly.

 

What do you think?

 

 

 

asfimport commented 2 years ago

Suhan Mao (migrated from JIRA)

@jpountz sorry to interrupt you, could you share some opinion about this rollup feature. Is it worthy to move on or we need extra discussion about it?

asfimport commented 2 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

Thanks I understand better now. With the sidecar approach, could you compute rollups at index time by performing updates instead of hooking into the merging process? For instance if a user is adding a new sample, you could retrieve data for the current <your-data-granularity-goes-here> bucket for the given dimensions and update the min/max/sum values?

asfimport commented 2 years ago

Suhan Mao (migrated from JIRA)

@jpountz  Sorry for the late reply and thanks for your suggestion. I understand computing rollup during index time is easy to implement, but there's still some drawback that should be taken into consideration.

It will slow down the index performance because it need to take extra actions compared to append-only action

  1. invoke a term query to retrieve the all the fields

  2. compute rollup logic and save to new fields

  3. delete original doc

  4. index new fields

 

I think the most concerned point is that you do not want change the lucene merge semantics, so I come up with a new approach to do rollup.

  1. The index sorting refer to all dimension fields.
  2. The index is written as a normal one and we continuously append documents.
  3. Run a special rollup scheduler and do a merge sort on doc values files from different segments and do rollup computation in a streaming way. It can be implemented inside the lucene or we can do it externally.
  4. Append new calculated doc in the index and delete docs from original segments.
  5. Normal segment merges can still work to recycle deleted docs/segments.
  6. There will be some interaction between rollup scheduler and merge scheduler to make the whole process more smoothly.

What do you think?