Open bharath-techie opened 7 months ago
Figuring out the right API for this idea sounds challenging, but I like the idea.
I wonder if we could think of this more broadly as a caching problem.
Basically, you could evaluate some "question" (aggregations, statistics, etc.) for all segments and save the index-level question and the per-SegmentCommitInfo
answers to disk. When a new segment gets flushed (or produced by a merge), we could eagerly evaluate the question against the new segment. (We would probably also need to reevaluate for segments that have new deletes, and maybe updated doc values.)
There are several advantages to keeping the new index as part of the same Lucene segment. It reduces maintenance overhead and enables Near Real-Time (NRT) use cases. Specifically, for the star tree index, incrementally building the star tree as segments get flushed and merged takes significantly less time, as the sort and aggregation operations can be optimized.
Considering these advantages, I'm further exploring the idea of a new format to support multi-field indices, which can also be extended to create other types of composite indices.
Since Lucene is also used for OLAP use cases, we can create a 'DataCubesFormat' specifically designed to create multi-field indices on a set of dimensions and metrics. [Preferred]
Alternatively, if we want a more generic format for creating indices based on any set of fields, we could go with 'CompositeValuesFormat'.
While the underlying implementation for both formats would be similar (creating indices on a set of Lucene fields), 'DataCubesFormat' is more descriptive and tailored to the OLAP use case.
For clarity, we will focus on 'DataCubesFormat' in the rest of this section.
Broadly, we have two ways to implement the format.
Pros
Cons
Users can pass the set of dimensions and metrics as part of a new 'DataCubeField' during the 'ProcessDocument' flow.
Pros
Cons
Overall, the preferred approach of using 'IndexWriterConfig' and 'SegmentInfo' seems more suitable for implementing the 'DataCubesFormat'.
public abstract class DataCubesFormat implements NamedSPILoader.NamedSPI {
/**
* Returns producer to read the data cubes from the index.
*/
public abstract DataCubesProducer<?> fieldsProducer(SegmentReadState state) throws IOException;
/**
* Returns a DocValuesConsumer to write a 'DataCubes' index based on doc values.
*/
public abstract DataCubesDocValuesConsumer fieldsConsumer(
SegmentWriteState state, DataCubesConfig dataCubesConfig) throws IOException;
}
public class DataCubesConfig {
public List<DataCubeField> getFields() {
// Implementation
}
// Additional configuration options as needed
}
public class DataCubeField {
public List<Dimension> getDimensions();
public List<Metric> getMetrics();
public static class DataCubeFieldProvider {
public DataCubeField readDataCubeField(DataInput in) {
// Implementation
}
public void writeDataCubeField(DataCubeField dataCubeField, DataOutput out) {
// Implementation
}
}
}
public class Dimension {
public String fieldName;
// Additional dimension-specific options
}
public class Metric {
public String fieldName;
public AggregationFunction aggregationFunction;
// Additional metric-specific options
}
The DataCubesDocValuesConsumer consumes the DocValues writer to read the DocValues data and create new indices based on the DataCubesConfig.
The DataCubesProducer/Reader is used to read the 'DataCubes' index from the segment.
In this example, I've implemented 'StarTree' index by extending 'DataCubeFormat'
{
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig().setCodec(Codec.forName("Lucene95"));
// Supply the dimensions and metrics fields to form the data cube config
Set<String> dims = Set.of("clientip", "targetip");
Set<String> metrics = Set.of("status");
StarTreeDataCubeField field = new StarTreeDataCubeField("datacube-1", dims, metrics);
StarTreeDataCubeField[] fieldArr = new StarTreeDataCubeField[]{field};
StarTreeCubeConfig starConfig = new StarTreeCubeConfig(fieldArr);
iwc.setDataCube(starConfig);
IndexWriter writer = new IndexWriter(dir, iwc);
int numDoc = 10;
add(writer);
writer.commit();
writer.close();
IndexReader reader = DirectoryReader.open(dir);
// read the DataCubeValues from the underlying index
for (LeafReaderContext leafReaderContext : reader.leaves()) {
DataCubeValues<?> dataCubeValues =
leafReaderContext.reader().getDataCubeValues(DATA_CUBE_FIELD);
// todo : implementation
}
reader.close();
dir.close();
}
Wow! Adding data cube (OLAP) capabilities to Lucene could be really powerful.
Adding it as a new format does sound like the right idea to me.
I would like to better understand how DataCubeField
would interact with, say, NumericDocValues
or SortedSetDocValues
. In theory, I would like to compute dimensional metrics based on a numeric field without explicitly needing to "add it twice".
I'm thinking, e.g. of adding an IntField
to a bunch of documents, and it could either be used as a dimensional field (where the dimension could be derived by mapping the value to a range bucket) or it could contribute a metric (where e.g. I would like to know the min/max/average value of the field subject to dimensional constraints). Similarly, I would like to be able to add a KeywordField
and use its value as a dimension.
Could something like that be achieved by adding a "datacube dimensionality" attribute to a field, similar to how KeywordField
is both an indexed string field and a SortedSetDocValuesField
?
Thanks for the comments @msfroh .
Good idea, if we want to supply Dims
and metric
values to DataCubesWriter
as part of addDocument
flow and consume them similar to other formats.
But there are some challenges:
IntField
for example )The same IntField
can be part of both dimension and metric ( in fact multiple metrics ) as part of a DataCubeField
. And same IntField
can be part of multiple DataCubeField
.
DataCubesWriter
for each DataCubeField
, there will be duplicate values depending on the configuration.So in order to avoid the duplication of values , how about we derive the values of DataCubeField
from the original values of DocValuesWriter
during flush
?
IntField
values will be already part of DocValuesWriter
, so we can supply DataCubesConsumer
and keep track of the resultant values.
During flush, in a new method writeDataCubes
, we supply dataCubeDocValuesConsumer
to docValuesWriter.flush
// For all doc values fields
if(perField.docValuesWriter !=null) {
{
if (dataCubeDocValuesConsumer == null) {
// lazy init
DataCubesFormat fmt = state.segmentInfo.getCodec().dataCubesFormat();
dataCubeDocValuesConsumer = fmt.fieldsConsumer(state, dataCubesConfig);
}
perField.docValuesWriter.flush(state, sortMap, dataCubeDocValuesConsumer);
}
}
// This creates the dataCubes indices
dataCubeDocValuesConsumer.flush(dataCubesConfig);
`DocValuesWriter.flush` calls respective `addNumericField` , `addSortedSetField` in the supplied consumer.
2. Then in the `DataCubesDocValuesConsumer`, we keep track of the fields and the associated doc values. And in flush we can make use of the `DocValues` for each`DataCubeField`
public class DataCubeDocValuesConsumer extends DocValuesConsumer {
Map<String, NumericDocValues> numericDocValuesMap = new ConcurrentHashMap<>(); Map<String, SortedSetDocValues> sortedSetDocValuesMap = new ConcurrentHashMap<>();
@Override public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { sortedSetDocValuesMap.put(field.name, valuesProducer.getSortedSet(field)); }
@Override public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { numericDocValuesMap.put(field.name, valuesProducer.getNumeric(field)); } }
public void flush(DataCubesConfig dataCubesConfig) throws IOException { for(DataCubeField field : config.getFields()) { for(String dim : field.getDims()) { // Get docValues from the map ( we can get a clone / singleton ) // Custom implementation over docValuesIterator } for(String metric : field.getMetrics()) { // Get docValues from the map // Custom implementation over docValuesIterator } } }
### Merge
During merge, we will most likely not need `DocValues` , instead `Merge` will be for `DataCubeIndices` and associated structures.
POC [code](https://github.com/bharath-techie/lucene/commit/d4455221becca86c039236f4a730066626207870#diff-24dc83bf177eafec2219cdc119f0eaae8c52da9a949973d8045fdef49a0de16e)
It's not clear to me how we'd take advantage of this information at search time. What changes would we make to e.g. Collector
to allow it to take advantage of these new data structures?
Hi @jpountz ,
Good question, if we take StarTreeDataCube
as an example implementation of the above format :
We will traverse the StarTree
and StarTreeDocValues
(dimensionDocValues - if needed ) during query to get the resultant docIdSet.
@Override
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
throws IOException {
return new ConstantScoreWeight(this, boost) {
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
StarTreeAggregatedValues starTreeValues = null;
DocIdSetIterator result = null;
DataCubeValues<?> dataCubVals = context.reader().getDataCubeValues(field);
Object dataCubeValues = dataCubVals.getDataCubeValues();
if (dataCubeValues != null) {
starTreeValues = (StarTreeAggregatedValues) dataCubeValues;
StarTreeFilter filter = new StarTreeFilter(starTreeValues, filterPredicateMap, groupByCols);
// Traverse star tree and get the resultant DocIdSetIterator of star tree + star tree doc values
result = filter.getStarTreeResult();
}
return new ConstantScoreScorer(this, score(), scoreMode, result);
}
@Override
public boolean isCacheable(LeafReaderContext ctx) {
return false;
}
};
}
And coming to collector
changes, the collector needs to traverse the StarTreeDocValues
to get the values based on the associated DocIds
.
For example collect
override for sum
aggregation will look like below where we traverse the associated metricDocValues
.
@Override
public void collect(int doc)
throws IOException {
docsBuilder.grow(1).add(doc);
DataCubeValues<?> dataCubVals = context.reader().getDataCubeValues(field);
Object dataCubeValues = dataCubVals.getDataCubeValues();
if(dataCubeValues != null) {
StarTreeAggregatedValues starAggs = (StarTreeAggregatedValues) dataCubeValues;
NumericDocValues dv = starAggs.metricValues.get("status_sum");
dv.advanceExact(doc);
sum += dv.longValue();
}
totalHits++;
}
private void queryWithFilter(IndexWriter w)
throws IOException {
final IndexReader reader = DirectoryReader.open(w);
final IndexSearcher searcher = newSearcher(reader, false);
Set<String> groupByCols = new HashSet<>();
//groupByCols.add("day");
Map<String, List<Predicate<Integer>>> predicateMap = new HashMap<>();
List<Predicate<Integer>> predicates = new ArrayList<>();
predicates.add(day -> day > 2 && day < 5);
predicates.add(day -> day == 30);
predicateMap.put("day", predicates);
predicates = new ArrayList<>();
predicates.add(status -> status == 200);
predicateMap.put("status", predicates);
final Query starTreeQuery = new StarTreeQuery(predicateMap, groupByCols);
searcher.search(starTreeQuery, getStarTreeSumCollector());
Code reference - This contains star tree implementation but this is old code where I've not integrated yet with DataCubes format etc. , I've followed Approach 2 - Extend DocValues format
but the query/collect logic should remain similar.
This reminded me of an older issue: https://github.com/apache/lucene/issues/11463 that seems to have foundered. Maybe there is something to be learned from that, not sure.
Thanks for the inputs @msokolov . I do see the similarities but the linked issue seems to be tied to rollups done as part of merge aided by index sorting on the dimensions. Index sorting is quite expensive.
The difference here is that, all the computation is deferred to the format and its custom logic. And query time gains could be higher as we are using efficient cubing structures.
For star tree implementation, the algorithm sorts the dims and then aggregates during flush , the successive merges just need to sort and aggregate the compacted, sorted data cube structures. So W.r.t performance , if the dimensions are of relatively lower cardinality , then there is minimal impact on index-append throughput ( < 2% ) as the difference is mainly due to write threads helping during flush. ( reference in OpenSearch RFC )
There are some cons here as well ,
Let me know your thoughts.
My main concern was to ensure this exciting effort didn't get blocked by the need to do major changes to existing indexing workloads. It sounds like the plan here is less intrusive and confined to the new format, for which +1
At first sight I don't like the fact that this seems to plug in a whole new way of doing things. Either you don't use a star tree index and you do things the usual way with filters and collectors, or you want to use a star tree index and then you need to craft queries in a very specific way if you want to be able to take advantage of the optimization for aggregations. Since this optimization is about aggregating data, I'd like this to mostly require changes on the collector side from the end user's perspective.
It would be somewhat less efficient, but an alternative I'm contemplating would consist of the following:
LeafCollector#collectRange(int docIdStart, int docIdEnd)
API for these queries to use.LeafCollector#collectRange
.Thanks for the inputs @jpountz . Let me spend some more time on this.
But this is a topic which was thought of as well and one idea was to do query abstraction / planning. Let me know your thoughts :
Can the concern with query can be solved by abstracting it by introducing DataCubeOrOriginalQuery
similar to IndexOrDocValuesQuery
?
In fact the input can remain the same as of the original query , we can do a check if that can be solved via DataCubesIndex
.
Terms / multi terms/ range / boolean queries and associated fields are something we can parse and check if there are any DataCubeField
that contains the same fields as dims and if so we can choose to pick the optimal DataCubeField
and form associated query in the background based on the cost.
Or we can also think of query rewriting if a particular query can be solved using DataCubes
.
But there is an issue in collection
still, user needs to pass two collector implementations which needs to be picked based on which query path is taken. We can see how to better implement this if the above idea works.
Description
We are exploring the use case of building materialized views for certain fields and dimensions using Star Tree index while indexing the data. This will be based on the configured fields (dimensions and metrics) during index creation. This is inspired from http://hanj.cs.illinois.edu/pdf/vldb03_starcube.pdf and Apache Pinot’s Star Tree index. Star Tree helps to enforce upper bound on the aggregation queries ensuring predictable latency and resource usage, it is also storage space efficient and configurable. OpenSearch RFC : https://github.com/opensearch-project/OpenSearch/issues/12498
Creating this issue to discuss approaches to support Star Tree in Lucene and also to get feedback on any other approaches/recommendations from the community.
Quick overview on Star Tree index creation flow
The Star Tree DocValues fields and Star Tree index are created during the flush / merge flows of indexing
Flush / merge flow
Challenges
Main challenge is that ‘StarTree’ index is a multi-field index compared to other formats in Lucene / OpenSearch. This makes it infeasible to use the PerField extension defined in Lucene today. We explored ‘BinaryDocValues’ to encode dimensions and metrics, but the ‘type’ of dimensions and metrics are different. So we couldn’t find a way to extend it. [Dimensions could be numeric or text or combination].
Create Star Tree index
Approach 1 - Create a new format to build materialized views
We can create a new dedicated file format (similar to points format, postings format) for materialized views which accepts list of dimensions and metrics and the default implementation for it could be the Star Tree index.
Pros
Cons
Approach 2 - Extend DocValues format
Indexing - Extend DocValues to support materialized views
We can extend DocValues format to support a new type of field ‘AGGREGATED’ which will hold the configured list of dimensions and metrics by the user during index creation.
During flush / merge , the values of the dimensions and metrics will be read from the associated ‘DocValues’ fields using DocValuesProducer and we will create the Star Tree indices as per the steps mentioned above.
Search flow
We can extend ‘LeafReader’ and ‘DocValuesProducer’ with a new method ‘getAggregatedDocValues’ to get the Star Tree index during query time. This retrieves the root of the Star Tree and the dimensions and metrics DocValues fields.
Pros
Cons
Open questions
Any suggestions on a way to pack values of ‘dimensions’ and ‘metrics' as part of ‘AggregatedField’ during indexing as part of ‘addDocument’ flow? Also, should we explore this or we can simply create the derived ‘AggregatedField’ during flush/merge ?
Create Star Tree DocValues fields
Star Tree index is backed by Star Tree DocValues fields. So to read/write, we can reuse the existing ‘DocValuesFormat’. Each field is stored as ‘Numeric’ DocValues field or ‘SortedSet’ DocValues field in case of text fields.
To accommodate this, we propose to make DocValuesFormat extend ‘Codec’ and ‘Extension’ , so that we can create the StarTree DocValues fields with custom extensions.