opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.43k stars 1.72k forks source link

TimeSeries optimizations in OpenSearch #3734

Open rishabhmaurya opened 2 years ago

rishabhmaurya commented 2 years ago

Is your feature request related to a problem? Please describe. Dedicated architecture for TimeSeries in OpenSearch to offer low cost and better performance for such use cases.

Describe the solution you'd like

TSDB Design Proposal

This document proposes an alternate approach to store and query TimeSeries data in OpenSearch. It presents definition and representation of TimeSeries data, an approach using existing OpenSearch and Lucene architecture to store and query TS data. It compares storage, cost and query performance against existing approach OpenSearch users would take to store and query TS data. Later it proposes even efficient approach to store and perform range queries over TS data with certain limitations. This document doesn’t covers the performance benchmark numbers and prototype details, but it captures the high level picture of performance and cost benefits of the newer over traditional approach.

TimeSeries data definition

In TS data, the data points (or measurements/collection metrics) are associated with a timestamp and set of dimensions, which are emitted periodically. E.g. Weather data, measurements from sensor/IOT devices, stocks/finance data, host metrics etc. In host metrics data, dimensions could be - hostname, region, AZ, IP. Datapoint will contain collection of metrics such as CPU, memory, Error Count associated with a timestamp.

Representation:

Dimensions: Set of dimensions D = {D1, D2, .. Dn} where Di is the ith dimension. N is the number of dimensions associated with any data point. N here has very strict constraint and should be under 10 in most of the cases.

Metrics: set of metrics M = {M1, M2 .. } each metric is a tuple of <metric_name, metric_type>, metric_type could be one of {sum, avg, min, max, count}.

Datapoint consists of: TimeSeries TS - its a representation of values of all dimensions associated with a datapoint.

TSi is the timeseries associated with ith datapoint TSi = {DV1, DV2 .. DVn} where DVj is the dimension value of Dj. DVj is always a low cardinality field. The number of unique values have limits here to be fit for TimeSeries data. Pi, the ith datapoint = {MV1, MV2, ..} where MVj is the metric value of jth metric in metric collection M of ith datapoint. Ti is the timestamp associated with the ith datapoint

so document is a tuple of <TSi, Ti, Pi>

fig. 1 fig. 2
Timeseries1 Timeseries-2

Traditional approach

Traditional approach in OpenSearch is based on creating DataStreams and indexing documents with mandatory @timestamp field, mapped as a date. Dimension field defined above, Di , can be mapped as numeric, keyword, IP or a similar type. Metric field Mi will b mapped as a numeric field.

DataStreams have hidden backing index, which are associated with a time range, and depending on timestamp, the document is routed to the right backing index. IndexManagement policy can be defined which will automatically create a new backing index when desired.

Indexing on data streams is similar to any other index. Doc will be stored as it is in segment with all the fields and in order the documents are ingested unless index sort order is specified by the user.

Query types in TS data

Queries over TS data will be of nature -

QTs-Ts (Filter(D={DV1, DV2 .. DVn}), M={M1, M2})

QTs-Ts (Filter(D={DV1, DV2}), Agg(D={D3}), M={M1, M2}) Ts is the start time and Te is the end time. Filter represents filter query and Agg represents aggregation on certain field. Each query will output an aggregated result of metrics M requested in the query. for e.g. min, max, avg on metric field.

Performance on each segment Query is a combination of AND conjunctions on all dimensions, range query on timestamp and metric aggregation on metrics requested. Assuming there is no sort order and given dimensional value is a low cardinality field, the filter query could potentially match a very high percentage of documents. Scanning through each of them, to look up metric field or aggregate on another dimension, in a non-ordered way, would require tremendous amount of disk IO to fetch documents. Placing similar documents together has 2 benefits - speeding up query, better compression.

Proposal

Tenets for TS data:

  1. Minimize IO while query.
  2. Better compression when segments are persisted on disk.
  3. Fast range queries and metric aggregations.
  4. Minimize memory consumption at query time.
  5. Keeping aged data with courser granularity.

All these tenets aims toward better query performance and lower the cost to store and query time series data.

Proposal 1

Timeseries-3

Creating TimeSeries and TimeSeriesID

Instead of storing all Dimensional values in separate fields, they can combined into a single field of type byteref. Each of these timeseries can be referred using an ID - TSID, generated by encoding their byteref representation.

TSID = encode(byteref(D1V1, DiVk .. DnVm))

Instead of indexing all dimensions, just TSID can be indexed, which can be decoded later if needed at query time.

Note: Number of TSIDs will not grow much in a segment as dimension values in time series data are limited and doesn’t explode. So TSID still remains a low cardinality field.

Index Sorting

Index sort order would be - (TSID, timesstamp)

When index is refreshed and segments are persisted on disk, it would order docs on the basis of above sort order. In this case, all similar timeseries will be kept together, and docs with same timeseries will be ordered on the basis of timestamp.

TSIDs are encoded such that the - if TSIDi < TSIDj then byteref(TSIDi ) < byteref(TSIDj). So all timeseries are lexicographically sorted, based on their dimensional value, inside a segment.

Queries like QTs-Ts (Filter(D={DV1, DV2 .. DVn}), M={M1, M2}) which are filtering on Timeseries and is a range query on timestamps, can be very efficiently executed by iterating over the docs in docvalues in an ordered way and skipping over large batches of the irrelevant docs (skipping the batches of TSIDs). Also, by not indexing each dimensions, instead just indexing the TSID, will avoid AND conjunctions on dimension values.

Shard Routing Strategy

Strategy should aim toward storing similar timeseries together for better query performance and compression results. The shards can be routed greedily based on the dimension value, to keep similar timeseries together, at the same time keeping the shards sizes uniform across index.

Rollups

Aged data will be rolledup with courser granularity of timestamps, so ranges of timestamps can be merged by merging each of the summary metrics associated with the datapoints. This will be similar to rollups in index management. The configuration here would be even simpler by just defining the timeranges and their granularity.

Even faster range queries and efficient storage

Idea is to store timestamps against each timeseries in an ordered tree which could efficiently execute range queries without scanning over all matching timestamps in linear time. Also, when storing such trees on disk, it should efficiently retrieve internal nodes minimizing the need to read/load the full file in-memory.

Proposal 2: Extending Points and BKD in Lucene

Timeseries-4

This is similar to Proposal 1, but instead of using Index Sort to perform fast range queries on timestamps, we will be exploring the use of single dimensional BKD trees in lucene for range queries. Today, the internal nodes of BKD doesn’t store any information other than file pointer, split dimension, split value which are required to traverse to the right leaf nodes while querying. This proposal is centered around additionally storing the aggregated summaries in the internal nodes.

Assumptions:

Changes:

Changes in BKD:

  1. Maintain single dimension BKD Tree per TSID (computed as described here). The single dimension values will be timesstamp of points.
  2. Store aggregated summaries in internal nodes of BKD.
  3. Leaf nodes will store the docIDs for all the points falling under min, max value(timesstamp) of that leaf node same as today. Additionally, it will also store the aggregated summaries of all the points belonging to the leaf node.

Timeseries related changes: Timeseries will also be stored in a sorted order as SortedDocValues. So while filter queries on timeseries, they can be retrieved in an ordered way. A new query type will be introduced which can output the aggregation summaries on metrics given a timeseries and range of timestamps.

Lucene changes:

Changes pertaining to BKD with summaries can be found here - https://github.com/rishabhmaurya/lucene/pull/1

Performance

This approach wil change the time complexity of tree traversal from O(N) to O(logN) where N is the number of hits for a given timeseries queries. Also, it would save visiting doc values to compute aggregation summaries but instead use the precomputed aggregation summaries.

Cons

All assumptions made are cons here, the question is if its fine to live with these assumptions for timeseries use-cases.

PTRangeOrDocValues query

This can be an additional feature for power users who are fine to bear the cost associated with storing timeseries data using both approaches. PTRangeOrDocValues can be helpful for those users to estimate the cost of running queries using both models and choose the optimal one.

These powers users will also not loose the features flexibility offered by approach 1 and not by approach 2, but will not be as optimal in terms of performance when compared to approach 1.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

penghuo commented 2 years ago

@rishabhmaurya Nice doc!

One question, How do we propose to query the TSDB? Taken examples from PromQL, let's say http_requets_total metrics has 4 dimensions (job, group, stage, method), 1.we want to select particular timeseries from http_requets_total metrics. e.g. http_requests_total{job="prometheus",group="canary"}. 2.we want to aggregate on sum by (job, group) (http_requests_total).

Correct me if i am wrong, my understanding is TSID will help only when we do exactly selection and rollup aggregation.

  1. search for exactly match of all the dimensions http_requests_total{job="prometheus",group="canary",stage=prod, methdo='GET'}.
  2. sum(http_requests_total, 7 days)
rishabhmaurya commented 2 years ago

good question and I'll be updating doc for better understanding of the logic.

  1. we want to select particular timeseries from http_requets_total metrics. e.g. http_requests_total{job="prometheus",group="canary"}.

The idea is to perform linear scan on matching TSIDs instead of each doc. So, in this case, all TSIDs matching {job="prometheus",group="canary"}, will be iterated over. Lets take the best case, job is dimension_1 and group is dimension_2, so now we know anything starting with TSID(encode(dimension_1=prometheus) + encode(dimension2=canary)) will be matching the filter in the query and we can skip over rest of the TSIDs. Worst case would be they are last 2 dimensions when we encode and find TSID, in that case too we can the iterator.advance() would be smart to skip over all irrelevant TSIDs in batches.

  1. we want to aggregate on sum by (job, group) (http_requests_total).

this is easy and the logic remains the same as in aggregation today. If we use proposal 1, we get all docIDs matching TSIDs and aggregation logic remain the same - the DocIDSetIterator will be passed to the aggregation phase which can form the right buckets and keep aggregating and use DV as needed.

downsrob commented 2 years ago

If your TSID is based on n dimensions, will you only be able to support queries which filter based on all n dimensions? And will the queries for all dimensions need to be exact matches? I would imagine that you lose the ability to support range queries on numeric dimensions after the dimensions are encoded.

rishabhmaurya commented 2 years ago

If your TSID is based on n dimensions, will you only be able to support queries which filter based on all n dimensions?

No, as explained in the example above, one can filter on subset of dimensions and aggregate on others. All TSIDs matching the dimensions filter in the query will be iterated over.

And will the queries for all dimensions need to be exact matches? I would imagine that you lose the ability to support range queries on numeric dimensions after the dimensions are encoded.

If its numeric field with high cardinality, then its not a dimension in time series. Usually, dimension represents some aspect/feature of a device/machine emitting logs, it doesn't makes lot of sense for it to be numeric with use case of range queries on them. Also, we will only support specific field types for dimensions like keyword, integer, date, byte etc and there will be limit on number of unique values.

@downsrob Let me edit doc with more detailed explanation, its kind of confusing at present.

muralikpbhat commented 2 years ago

Really good proposal. Essentially, the approach can be summarized as grouping the metrics by dimensions and restricting the filters to only those. This would allow us to use one index rather than querying 3 other formats like points, termdict and docvalues. I know this comes with some restriction, but can definitely be performance gain for particular use case. I like the second proposal with BKD approach for maximizing performance.

I would like to see more details on 1) How do we identify which TSDs to include based on the dimensions queried? Don't we need another index for that? 2) Why can’t we achieve the same thing today by flattening the fieldname (d1v1_d2v2_m1)? (i understand proposal 2 has stat optimization ) 3) Are we thinking of one index file per metric or is there a way to encode multiple TSD into same index file? 4) Since this is restricting the filters to only dimensions, are we losing all other benefits of OpenSearch/Lucene? If so, what is the benefit compared to other traditional Timeseries Databases?

more granular comments inline below

Set of dimensions D = {D1, D2, .. Dn} where Di is the ith dimension. N is the number of dimensions associated with any data point. N here has very strict constraint and should be under 10 in most of the cases.

Seems like we should not restrict it to 10.

set of metrics M = {M1, M2 .. } each metric is a tuple of <metric_name, metric_type>, metric_type could be one of {sum, avg, min, max, count}.

Not very clear why we need metric_type. If it is for the pre-compute optimization, it better named as metric_aggregation_target_type. Also, why should be one instead of many? Most cases same metric might be queries for different aggregation.

{sum, avg, min, max, count}.

Why not percentiles, it will be critical to support?

TSi = {DV1, DV2 .. DVn} where DVj is the dimension value of Dj.

Is it better represented as {D1:V1, D2:V2 .. Dn:Vn} ? The current one seems to indicate we are storing ordered values only . While it is good optimization, it might be too restrictive and may not allow addition of dimensions.

Traditional approach in OpenSearch is based on creating DataStreams and indexing documents with mandatory @timestamp field, mapped as a date.

I think the new proposal also requires datastream or index rotation. So probably we can just focus on the problems of traditional approach rather than distracting with datastream. We also need to include how we do range queries across index boundaries.

Each query will output an aggregated result of metrics M requested in the query. for e.g. min, max, avg on metric field.

It is possible that some queries might need just row data instead of the stats.

TSID = encode(byteref(D1V1, DiVk .. DnVm))

Don’t we need to have metric name also in the TSID? Or we just TSID as container of all the metrics and one index file per TSID?

Instead of indexing all dimensions, just TSID can be indexed, which can be decoded later if needed at query time.

How do we filter if only subset of dimensions are passed during query time? Need another index structure it seems.

can be very efficiently executed by iterating over the docs in docvalues in an ordered way and skipping over large batches of the irrelevant docs

If the TSID contains multiple Metrics, and if it is ordered my timestamp, then this ordering wont wont. This is why I thought TSID will actualy have metricname as well. Please confirm

Shard Routing Strategy

Are you suggesting to use TSID as the routing field, call it out explicitly.

Rollups

Does it still require ISM or is there a way to do this inplace within the index?

BKD proposal

Is the pre-computed stats in internal nodes the main benefit(seems like a big win)? Are there any other difference between proposal 1 and 2 if we ignore this optimization?

• Timeseries documents having fields other than dimensions, timestamps and metrics is not common, so queries will only support filtering and aggregation on dimensions, range query on timestamps and just statistics aggregations on metrics.

Isn’t that true in case of proposal 1 as well? If you support filter on non-dimension with docvalue, need to handle the skips in docvalues and you still have the original i/o issue.

• Append only use case, so document deletion will not be supported. (This can be supported but with some performance hits).

That is the case with time series anyway

PTRangeOrDocValues query

Nice thought, similar optimization exists today.

penghuo commented 2 years ago

More thoughts on aggregation.

TopK of multiple dimension is very generic use case in metrics query. But, comparing with term aggregation, the OpenSearch multi terms aggregation is slow. Base on the perf result, hash(d1, d2, ... dN) is the major contributor. The TSIDs matching solve the problem of generate the docIds set, but it may not help solve TopK problem.

  1. we want to aggregate on sum by (job, group) (http_requests_total).

this is easy and the logic remains the same as in aggregation today. If we use proposal 1, we get all docIDs matching TSIDs and aggregation logic remain the same - the DocIDSetIterator will be passed to the aggregation phase which can form the right buckets and keep aggregating and use DV as needed.

rishabhmaurya commented 2 years ago

thanks @muralikpbhat, you got the crux right and these are some good questions and I'll try to update doc to make them more clear.

How do we identify which TSDs to include based on the dimensions queried? Don't we need another index for that?

At the time of query as well, we will be encoding the dimensions to its bytesref representation. If a query is filter on partial dimensions, then full TSID cannot be generated, so in this case all matching (matching with partial TSID) TSIDs will be iterated over. We do not have to perform a slow advance (linear) when iterating over matching TSIDs, instead, given TSIDs will be ordered when indexed, so we can skip over batches on non-matching TSIDs by performing binary search for next matching TSID.

Why can’t we achieve the same thing today by flattening the fieldname (d1v1_d2v2_m1)? (i understand proposal 2 has stat optimization )

flattening could take a lot of space and even checking for a matching TSID will take order of size of all dimension values associated with a metric log. Loading all of them in-memory, when needed, will be an overkill too. Instead encoding the dimension value and then storing them in byterefs format will be efficient and may result in better compression results too. I see it very synonymous to using ordinals vs actual strings in aggregation, ordinals are way much faster and use lot less memory.

Are we thinking of one index file per metric or is there a way to encode multiple TSD into same index file?

In proposal 2, where we are using Points indexed on timestamp, each TSID will map to a set of Points. Today, Points are stored in BKD in lucene, and even if there are 5 fields of type Point in the document, there will still be only 3 standard file (kdm, kdi, kdd) created for all the Point fields and they will share the IndexInput/IndexOutput streams. Each point field in the stream are separated by their field number. Also, we store the leaf file pointers in index stream and index file pointer in metadata stream, so we can seek to the file pointers directly at the time of query. Answer is "one index file". If we treat each TSID as a separate field (that's how I'm prototyping it), then all BKDs will share the same files (kdm, kdi, kdd).

Since this is restricting the filters to only dimensions, are we losing all other benefits of OpenSearch/Lucene? If so, what is the benefit compared to other traditional Timeseries Databases?

I'm glad you brought this up as it is important to list the benefits over other timeseries databases. I do not have an answer yet, but I do agree we are losing bunch of capabilities with Proposal 2. But again, it can be used in conjunction with Proposal 1, where we aren't losing any of them. So if a user can keep both the indexes, the queries without the filter on non-dimension fields will be way faster, but if they do want to filter on non-dimensional fields, then it would come up with a cost of slower query.

Seems like we should not restrict it to 10. agreed, it should be configurable and feature should impose some defaults.

Not very clear why we need metric_type. If it is for the pre-compute optimization, it better named as metric_aggregation_target_type. Also, why should be one instead of many? Most cases same metric might be queries for different aggregation.

that makes sense. A metric can have its type, but its aggregation_target_type can have 1-many mapping.

Why not percentiles, it will be critical to support?

computing percentile requires all data points to be present, which we will loose in Proposal 2 as we are storing precomputed results. We can support topN usecase, if N is a low value. Storing all datapoints or using them while querying, defeats the purpose of using precomputed results, if percentiles in part of the query, then query should be smart to switch of proposal 1 to use TSIDs and docvalues.

Is it better represented as {D1:V1, D2:V2 .. Dn:Vn} ? The current one seems to indicate we are storing ordered values only . While it is good optimization, it might be too restrictive and may not allow addition of dimensions.

Supporting dynamic addition of new dimension would be tough with this approach, a reindex would be a better strategy for such cases.

I think the new proposal also requires datastream or index rotation. So probably we can just focus on the problems of traditional approach rather than distracting with datastream. We also need to include how we do range queries across index boundaries.

Agreed, it will be used in conjunction with datastreams, I will add this to the doc. I need to check the existing approach on handing the queries across boundaries, but a good callout.

It is possible that some queries might need just row data instead of the stats.

Ok, if it is a metric like - min, max - then it would correspond to single row, and I can think of use cases where user may want to fetch the docID and docvalues for them. Like in alerting plugin, a user may want to know the row to get more context around alert. This is possible with BKD approach too, all we need to do is also store a docID for such metric, but it should be a configurable index setting. With proposal 1, they can always get all the rows of the query result. Good point!

Don’t we need to have metric name also in the TSID? Or we just TSID as container of all the metrics and one index file per TSID? Each (TSID, timestamp) will be associated with multiple metrics as metrics are just used for stats aggregation and not for filtering/bucketing.

How do we filter if only subset of dimensions are passed during query time? Need another index structure it seems.

I have tried to answer this in first question. Will be updating doc for more clear explanation.

If the TSID contains multiple Metrics, and if it is ordered my timestamp, then this ordering wont wont. This is why I thought TSID will actualy have metricname as well. Please confirm

I'm not sure if i understood it right. But this will always be the case, TSID1->{(timestamp1->{metric1, metric2, metric3}), (timestamp2->{metric1, metric2, metric3}), ... }, sort order would be (TSID, timestamp).

Are you suggesting to use TSID as the routing field, call it out explicitly.

it would based on dimensions. Will add more details in the doc.

Does it still require ISM or is there a way to do this inplace within the index?

need to think more, this is an interesting problem though. Check https://issues.apache.org/jira/browse/LUCENE-10427 which proposes performing rollups at the of segment merges.

Is the pre-computed stats in internal nodes the main benefit(seems like a big win)? Are there any other difference between proposal 1 and 2 if we ignore this optimization?

No, in terms of features, proposal 2 is a subset of proposal 1. Proposal 1 is how we can use most of the existing infrastructure of lucene and opensearch without compromising on any features.

Isn’t that true in case of proposal 1 as well? If you support filter on non-dimension with docvalue, need to handle the skips in docvalues and you still have the original i/o issue.

The filtering will happen in the search phase, so in aggregation phase, where we will be fetching the DocValues, the docs will be passed in ordered way (TSIDs, timestamps) per segment, and we can create a global iterator as explained in the doc, which will ensure the DocSetIDIterator will iterate over docs in ordered way across all segments. So each aggregator will be reading docvalues in an ordered way where matching docs are placed one after the other.

rishabhmaurya commented 2 years ago

thanks for bring this up @penghuo

More thoughts on aggregation. TopK of multiple dimension is very generic use case in metrics query. But, comparing with term aggregation, https://github.com/opensearch-project/OpenSearch/pull/2687. https://github.com/opensearch-project/OpenSearch/issues/1629, hash(d1, d2, ... dN) is the major contributor. The TSIDs matching solve the problem of generate the docIds set, but it may not help solve TopK problem.

If K is a small number, then it can be stored as a precomputed stats in proposal 2, which will significantly improve the performance. So it can be an index setting, its similar to the point @muralikpbhat raised related to percentiles, which i have answered above.

multi-term aggregation feature is not specifically tied to timeseries use case, so having sort order on (TSID, metric associated with topK) instead of (TSID, timestamp), will significantly improve the performance as then you will get metric in a sorted manner, so no need to maintain a priority queue/heap per segment at the time of aggregation and you will be reading doc values of only relevant top K docs, which is a significant optimization.

rishabhmaurya commented 2 years ago

Changes related to BKD discussed here - https://github.com/rishabhmaurya/lucene/pull/1

gashutos commented 1 year ago

@rishabhmaurya , great proposal ! One doubt though, Wont we lose Point based optimization Lucene introduced here ? lucene-10320

I think, since in both of these approaches, we merge rest of fields other then timestamp in one field. I agree that querying by timestamp field would be faster but what about usecases where we query on other dimension values too ?

Like look at this PR, we gain significant based on Point based sorting optimizations. OpenSearch-6424