apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.68k stars 1.04k forks source link

[Performance] sort query improvement for sequential ordered data [e.g. timestamp field sort in log data] #12448

Open gashutos opened 1 year ago

gashutos commented 1 year ago

Problem statement

Currently in TopFieldCollector, we have PriorityQueue (min heap binary implemenation) to find top K elements in asc or desc order. Whenever we find newly iterated document competitive, we insert that in PriorityQueue and if size is full, we remove min competitive document from PriorityQueue and replace with newly competitive document. Time complexity for this scenatio is O(nlog(K)) in worst case where n = current number of competitive documents. Now what if n is pretty high and every document we try to insert in PriorityQueue take exact O(log(K)) ? We will end up spending lot of time in that case within PriorityQueue heapification process itself. We have skipping logic to reduce iterator cost 'n' whenever we update PriorityQueue, but that doesnt work well in some scenario.

For example,

for time series based logs/metric data where sort query is performed on timestamp field. This field will be always ever increasing (nearly sorted) field and it has very high cardinality as well ! Triggering desc order queries wont optimize much with skipping logic as well PriorityQueue will always result O(log(K)) for almost all heapification insertion because incming documents are coming in increasing order.

Like imagine below scenario where we are inserting logical timestamp in incrwasing order 1, 2, 3, 4, .... and insertion of 16 will result in worst case to heapify 16 till leaf node, same is true for 17, 18, 19,...... (Imagine we need top 15 hit so PQ size is 15).

image

Here in above scenario, skipping logic doesnt work too to find top K in desc order. But we can do better on priorityqueue side since we know incoming documents are in asc order (nearly sorted) so can skip heapifying them.

Solution proposed

The crux of the solution proposed is, We dont need to heapify elements if the incoming order is sorted as per desired order. The idea here is to keep incoming competitive documents in temporary circular array before inserting them into priorityqueue and trigger heapify.

Algorithm

  1. Create circular array of same size as priority queue. We shoud be able add element to last and remove element from first in this circular array implementation.
  2. If incoming document qualifies after TopFieldCollector.checkThreshold(), keep inserting them in circular array until order breaks, i.e. if next document is smaller for descending order traveral or vice versa for ascending order. Remove first element from circular array if queue is full while inserting.
  3. Once order breaks, insert entire circular array into priority queue and empty the circular array.

With this, we will able to skip millions of hits from going through heapification process.

POC : POC code Implemented above POC for reference with LinkedList instead circular array. Also need some additional stuff to handle cases like duplicate element where docid is given preference (But those should be minor changes).

Caveat of this implementation

Above proposed solution works pretty well for ever increasing or ever decreasing ordered fields like time series based data. But this could be additional overhead if data is completely random. This will incur additional comparison to add element in circular array first and then inserting to actual heap.

We have two ways to implement this,

  1. Have this implementation by default for all types of BKD based indexed field.
  2. Enable this implementation behind boolean flag, and it can be set true/false at collector level similar to what we have for PointBasedOptimization flag.

The approach 2 sounds very safe, but it results on more configuration overhead for users. The approach 1 can introduce overhead, but with the random data the skipping logic based on BKD points works pretty well and insertion on priority queue itself wont be much and hence we dont see much overhead. I have tested this with 13 million documents randomly on a numeric field (long) and with solution 1, we didnt see any regression in latency since skipping logic skipped many hits already. We dont have benchmark for numeric sort in Lucene itself, but I did indexed LongPoint in random order with 13 million points/documents. I see similar latency for sort 140 ms while with this additional circular queue it is 147 ms on r6g.2xlarge ec2 instances. (for finding desc order top 1000)

Suggestions from community

I would like to hear suggestion thoughts from community for above proposed change and better way to implement this. Specially time series based desc order usecase is pretty widely used, this can be covering larger pie for our users to get more optimized sort queries on such workload.

Edit

Performance gain with this optimizaion

For this, I have inserted 36 million documents with below schema, keeping @timestamp long field in asc order almost.

{
  "@timestamp": 898459201,
  "clientip": "211.11.9.0",
  "request": "GET /english/index.html HTTP/1.0",
  "status": 304,
  "size": 0
}

Below were the results, units are in ms, when I query top(K) in descending order. where k = 5, 100, 1000....

before my changes, top(K)  pq time total time  
5 hits 4 58  
100 hits 70 130  
1000 hits 95 197  

after my changes,

top(K)  pq time total time  
5 hits 0 47  
100 hits 1 59  
1000 hits 2 101  

The higher the value of k the higher heapifycation time is going to take.

jpountz commented 1 year ago

Lazily heapifying sounds interesting, and thanks for sharing performance numbers when data occurs in random order. Do you also have performance numbers for the case when the index sort is the opposite order compared to the query sort? I'm curious how much this optimization can save in that case since this is what you're trying to optimize.

We dont have benchmark for numeric sort in Lucene itself

Did you look at this task on nightly benchmarks? http://people.apache.org/~mikemccand/lucenebench/TermDTSort.html

You might also be interested in checking out this paper where Tencent describes optimizations that they made for a similar problem in section 4.5.2: they configure an index sort by ascending timestamp on their data, but still want to be able to perform both queries by ascending timestamp and descending timestamp. To handle the case when the index sort and the query sort are opposite, they query on exponentially growing windows of documents that match the end of the doc ID space.

gashutos commented 1 year ago

Thanks ! @jpountz for looking at this.

Lazily heapifying sounds interesting, and thanks for sharing performance numbers when data occurs in random order. Do you also have performance numbers for the case when the index sort is the opposite order compared to the query sort? I'm curious how much this optimization can save in that case since this is what you're trying to optimize.

For this, I have inserted 36 million documents with below schema, keeping @timestamp long field in asc order almost.

{
  "@timestamp": 898459201,
  "clientip": "211.11.9.0",
  "request": "GET /english/index.html HTTP/1.0",
  "status": 304,
  "size": 0
}

Below were the results, units are in ms, when I query top(K) in descending order. where k = 5, 100, 1000....

before my changes, top(K)  pq time total time  
5 hits 4 58  
100 hits 70 130  
1000 hits 95 197  

after my changes,

top(K)  pq time total time  
5 hits 0 47  
100 hits 1 59  
1000 hits 2 101  

The higher the value of k the higher heapifycation time is going to take.

You might also be interested in checking out this paper where Tencent describes optimizations that they made for a similar problem in section 4.5.2: they configure an index sort by ascending timestamp on their data, but still want to be able to perform both queries by ascending timestamp and descending timestamp. To handle the case when the index sort and the query sort are opposite, they query on exponentially growing windows of documents that match the end of the doc ID space.

Yeah, I have read this paper :) The main thing which I kind of felt bottleneck there is they use index sort in timestamp field which leads higher indexing latencies. That's a trade off there. I like our current implementation without index sort and use BKD points to skip non-competitive hits. And if we add this Leazy heapifycation as proposed here, that will give good advantage on current implementation itself.

gashutos commented 1 year ago

I have updated original issue description with performance gain with this optimization.

jpountz commented 1 year ago

Thinking a bit more about this optimization, I wonder if it would still work well under concurrent indexing. If I understand the optimization correctly, it relies on the fact that the n-th collected document would generally have a more competitive value than the (n-k)-th collected document to keep inserting into the circular buffer. But this wouldn't be true, e.g. under concurrent indexing if flushing segments that have (k+1) docs or more?

For instance, assume two indexing threads that index 10 documents each between two consecutive refreshes. The first segment could have timestamps 0, 2, 4, ..., 18 and the second segment could have timestamps 1, 3, 5, ..., 19. Then when they get merged, this would create a segment whose timestamps would be 0, 2, 4, ..., 18, 1, 3, 5, ..., 19. Now if you collect the top-5 hits by descending timestamp, the optimization would automatically disable itself when it has timestamps [10, 12, 14, 16, 18] in the queue and sees timestamp 1, since 1 < 10?

gashutos commented 1 year ago

Thinking a bit more about this optimization, I wonder if it would still work well under concurrent indexing. If I understand the optimization correctly, it relies on the fact that the n-th collected document would generally have a more competitive value than the (n-k)-th collected document to keep inserting into the circular buffer. But this wouldn't be true, e.g. under concurrent indexing if flushing segments that have (k+1) docs or more?

For instance, assume two indexing threads that index 10 documents each between two consecutive refreshes. The first segment could have timestamps 0, 2, 4, ..., 18 and the second segment could have timestamps 1, 3, 5, ..., 19. Then when they get merged, this would create a segment whose timestamps would be 0, 2, 4, ..., 18, 1, 3, 5, ..., 19. Now if you collect the top-5 hits by descending timestamp, the optimization would automatically disable itself when it has timestamps [10, 12, 14, 16, 18] in the queue and sees timestamp 1, since 1 < 10?

Yes, this wont optimize scenario where concurrent flush is invoked with very less document say 10 documents per flush. Reading at this article concurrent flush (it might be old article and not the latest detail on concurrent flushing, and I hope you are talking about this as concurrent indexing), it looks like a single flish would still contain 1000s of document to be flushed if we talk about millions. This optimization would still work very well if this number of document per single flush is higher and the top(k) is smaller. i.e If single flush is 1000 and if we need top(100) in descending order, we will still able to skip 900 documents from going through heapifycation process.

For instance, assume two indexing threads that index 10 documents each between two consecutive refreshes. The first segment could have timestamps 0, 2, 4, ..., 18 and the second segment could have timestamps 1, 3, 5, ..., 19. Then when they get merged, this would create a segment whose timestamps would be 0, 2, 4, ..., 18, 1, 3, 5, ..., 19. Now if you collect the top-5 hits by descending timestamp, the optimization would automatically disable itself when it has timestamps [10, 12, 14, 16, 18] in the queue and sees timestamp 1, since 1 < 10?

Like in the example you have given, lets modify it slightly like below, assume two indexing threads that index 100 documents each between two consecutive refreshes. The first segment could have timestamps 0, 2, 4, ..., 198 and the second segment could have timestamps 1, 3, 5, ..., 199. Then when they get merged, this would create a segment whose timestamps would be 0, 2, 4, ..., 198, 1, 3, 5, ..., 199. Now if you collect the top-5 hits by descending timestamp In this case,

  1. once we have 0,2,4,6,8 in the priority queue, 10 will be insterted in circular array, similarly 12, 14, 16 & 18. Those wont go through heapifycation as we are delaying it.
  2. Now 20 comes, we know order is still maintained, so we will insert 20 in circular array and remove 12 (12 is completely removed, neither it goes to priority queue). Same thing will happen for 22, 24,, ....198.
  3. At this stage, we are left with 0,2,4,6,8 in priority queue & 192,194,196,198 in circular array.
  4. Now 1 comes, and it breaks the sequence, so we will insert 192,194,196,198 in the priority queue and since priority queue size is 5, it will remove 0,2,4,6,8. 1 will be only present in circular array.
  5. 3, 5, 7, 9 comes and those will be instered in circular array since order is intact. (Those wont pass checkThreshold() but lets assume checkThreshold() is not there.)
  6. 11 comes and it will be insterted in circular array and and 5 will be removed. Same thing will keep going until we have 191, 193, 195, 197, 199 in curcular array.
  7. We are done now, we need to empty circular array and insert remaining 5 element in priority queue.

So we ended up skipping 190 hit going through heapification process out of 200.

Just for explanation I didnt take skipping logic in consideration else in step 4, we will have all the hits from 1 to 189 would be non-comprtitive marked by BKD point based competitive iterator.

Thank you for reading this long explanation and I hope this makes clarity to your doubt.

I know it might not look like a very clean solution but desc sort order on timestamp field, specialy for logs/metrics scenario are very common and it gives sizable improvement to those queries.

backslasht commented 1 year ago

@gashutos - Slightly orthogonal to the proposal. Given that the optimization is primarily based on the order of the documents in the segment, would it makes sense to reverse the problem. Can it be sorted and stored (based on the input from the user), while retrieval, we should be able to skip the documents without heapify (or read in reverse order) as the order is guaranteed now.

What do you think?

gashutos commented 1 year ago

@backslasht The overhead of Index Sort is very high. I ingested above 36 million documents with/without @timestamp indexsort and different is minimum 20% plus. Refer below numbers.

Without Index sort on @timestamp

|                                                 Min Throughput | index-append |   182029 | docs/s |
|                                                Mean Throughput | index-append |   197051 | docs/s |
|                                              Median Throughput | index-append |   195665 | docs/s |
|                                                 Max Throughput | index-append |   210468 | docs/s |
|                                        50th percentile latency | index-append |  165.423 |     ms |
|                                        90th percentile latency | index-append |  231.446 |     ms |
|                                        99th percentile latency | index-append |  908.578 |     ms |
|                                      99.9th percentile latency | index-append |  8934.86 |     ms |
|                                     99.99th percentile latency | index-append |  10348.3 |     ms |
|                                       100th percentile latency | index-append |  10806.6 |     ms |

With index sort on @timestamp on ascending order.

|                                                 Min Throughput | index-append |   141237 | docs/s |
|                                                Mean Throughput | index-append |   149861 | docs/s |
|                                              Median Throughput | index-append |   146907 | docs/s |
|                                                 Max Throughput | index-append |   167086 | docs/s |
|                                        50th percentile latency | index-append |  210.367 |     ms |
|                                        90th percentile latency | index-append |  315.659 |     ms |
|                                        99th percentile latency | index-append |  1458.79 |     ms |
|                                      99.9th percentile latency | index-append |  10476.3 |     ms |
|                                     99.99th percentile latency | index-append |  10963.3 |     ms |
|                                       100th percentile latency | index-append |  10994.6 |     ms |