mikemccand / stargazers-migration-test

Testing Lucene's Jira -> GitHub issues migration
0 stars 0 forks source link

Early Terminating CollectorManager [LUCENE-8929] #926

Open mikemccand opened 5 years ago

mikemccand commented 5 years ago

We should have an early terminating collector manager which accurately tracks hits across all of its collectors and determines when there are enough hits, allowing all the collectors to abort.

The options for the same are:

1) Shared total count : Global "scoreboard" where all collectors update their current hit count. At the end of each document's collection, collector checks if N > threshold, and aborts if true

2) State Reporting Collectors: Collectors report their total number of counts collected periodically using a callback mechanism, and get a proceed or abort decision.

1) has the overhead of synchronization in the hot path, 2) can collect unnecessary hits before aborting.

I am planning to work on 2), unless objections


Legacy Jira details

LUCENE-8929 by Atri Sharma (@atris) on Jul 22 2019, updated Dec 30 2020

mikemccand commented 5 years ago

What collector do you have in mind? Is it TopFieldCollector?

[Legacy Jira: Adrien Grand (@jpountz) on Jul 23 2019]

mikemccand commented 5 years ago

What collector do you have in mind? Is it TopFieldCollector?

Yes, that is the one.

 

I did some tests, and am now inclined to go with 1), since that is a less invasive change and allows accurate termination with minimal overhead (< 3% degradation). This is due to the fact that AtomicInteger is mostly not implemented with a synchronization lock on modern hardwares.

[Legacy Jira: Atri Sharma (@atris) on Jul 23 2019]

mikemccand commented 5 years ago

OK, so if I understand correctly you are still collecting the first numHits hits as today, but you are trying to avoid collecting ${totalHitsThreshold-numHits} additional hits on every slice with this global counter?

[Legacy Jira: Adrien Grand (@jpountz) on Jul 23 2019]

mikemccand commented 5 years ago

OK, so if I understand correctly you are still collecting the first numHits hits as today, but you are trying to avoid collecting ${totalHitsThreshold-numHits} additional hits on every slice with this global counter?

Yeah, exactly.

The first numHits hits can be spread across all the involved collectors, but with the global counter, all collectors will abort once they realize that numHits number of hits have been collected globally, even if the total hit count per collector is, obviously, < numHits.

[Legacy Jira: Atri Sharma (@atris) on Jul 23 2019]

mikemccand commented 5 years ago

Hmm I am confused now, I don't think you can spread the top numHits hits across collectors given that index sorting works on a per-segment basis. So you need to collect each segment at least until {numHits} hits have been collected, or until the last collected hit was not competitive globally (whichever comes first)?

[Legacy Jira: Adrien Grand (@jpountz) on Jul 23 2019]

mikemccand commented 5 years ago

So you need to collect each segment at least until {numHits} hits have been collected, or until the last collected hit was not competitive globally (whichever comes first)

Yeah, sorry I was not clear. Per collector, we will collect until numHits hits are collected.

I have opened a PR implementing the same: https://github.com/apache/lucene-solr/pull/803

Hoping the code gives more clarity

[Legacy Jira: Atri Sharma (@atris) on Jul 23 2019]

mikemccand commented 5 years ago

Ok, so I have been working on this and am wondering what the definition (parameter) of a globally competitive hit be. Should it be the largest of the worst accepted hit across all collectors, and all collectors use that as the minimum threshold when filtering further hits?

[Legacy Jira: Atri Sharma (@atris) on Jul 25 2019]

mikemccand commented 4 years ago

This has been sitting for a while, and since I ended up needing something similar at ${DAY_JOB}, I'm sharing our solution. The basic idea is the same as what @atris described above, but ends up combining both (1) and (2) in the description, if I understand what was meant there. The comments in MaxScoreAccumulator go into the details. The basic idea is that we periodically share number of docs collected + current worst score from each leaf. Then once we have (globally) enough hits, and a leaf can determine it is no longer competitive (compared to the other leaves – ie it is the current worst), we terminate it. Then we revise the number of hits required by the remaining leaves, and if they are sufficient to cover that new reduced threshold, we can update the global worst score (to a better one), enabling a tighter bound for the remaining leaves. If the updating period is reduced to every hit then I think we essentially get a global priority queue, but this is too costly due to synchronization, so we back off to a reduced frequency.

I tested perf with luceneutil creating a sorted index over DayOfYear and another one over LastModDate. The day of year sorted queries got about 20% QPS gain with this change. All the tests below were run with 16 searcher threads (this only applies to concurrent searches across multiple segments):

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 586.56 (2.8%) 716.24 (1.2%) 22.1% ( 17% - 26%)
HighTermDayOfYearSort 580.73 (2.8%) 721.21 (1.2%) 24.2% ( 19% - 28%)

Last mod date queries see a much smaller improvement. I believe this is because the values are indexed roughly in order, causing simlar values to be grouped together in the same segment, which is an adversarial case for this algorithm.

Task QPS before StdDev QPS after StdDev Pct diff
LowTermLastModSort 1034.49 (4.3%) 1037.92 (3.1%) 0.3% ( -6% - 8%)
HighTermLastModSort 1056.17 (4.7%) 1109.77 (3.5%) 5.1% ( -2% - 13%)

Performance is unchanged when the conditions are not right, ie the index is not sorted by the query's sort, or relevance ranking is used:

Task QPS before StdDev QPS after StdDev Pct diff
HighTermDayOfYearSort 35.67 (2.2%) 35.45 (2.4%) -0.6% ( -5% - 4%)
OrHighHigh 14.07 (2.3%) 14.01 (3.1%) -0.4% ( -5% - 5%)
LowTerm 35.23 (2.2%) 35.11 (2.4%) -0.3% ( -4% - 4%)
MedTerm 34.81 (2.1%) 34.70 (2.2%) -0.3% ( -4% - 4%)
AndHighLow 34.68 (2.2%) 34.58 (2.5%) -0.3% ( -4% - 4%)
Wildcard 30.06 (1.8%) 29.98 (2.2%) -0.3% ( -4% - 3%)
HighTerm 34.99 (2.1%) 34.90 (2.4%) -0.3% ( -4% - 4%)
OrHighLow 32.55 (1.9%) 32.47 (2.3%) -0.3% ( -4% - 3%)
AndHighHigh 25.04 (2.7%) 24.98 (2.7%) -0.3% ( -5% - 5%)
PKLookup 58.95 (1.0%) 58.80 (1.2%) -0.2% ( -2% - 2%)
AndHighMed 30.45 (2.2%) 30.38 (2.3%) -0.2% ( -4% - 4%)
OrHighMed 28.19 (2.4%) 28.13 (2.8%) -0.2% ( -5% - 5%)

[Legacy Jira: Michael Sokolov (@msokolov) on Feb 03 2020]

mikemccand commented 4 years ago

Thanks @msokolov, these are nice results! And thank you for also fixing luceneutil to be able to benchmark sorted indices too ...

[Legacy Jira: Michael McCandless (@mikemccand) on Feb 04 2020]

mikemccand commented 4 years ago

OK, hm I now believe the results posted above are somewhat suspect since luceneutil had a bug when measuring timing of multiple tasks under concurrent search. Since the use of multiple threads enables the test harness to consume all system resources, there can be contention among multiple tasks run concurrently, leading to starvation of the fast ones. I pushed a patch that works around this issue in luceneutil and did a new round of measurements. I also updated the PR (opened a new PR actually- #1316, since it had been so long it was hard to re-use the old one). Updates since the previous PR include:

  1. Added tiebreak by docid to ensure we collect the right hits (there doesn't seem to be a test for this? Probably we should add one)
  2. No longer terminate by max min scores; the impact is largely superseded by this change. I measured having both and it seemed to hurt in most cases. Possibly we should terminate based on max-min score when 0 <N < 100, and using this approach (min-min score) for N > 100
  3.  This now bounds the thread count used to determine update frquency by the number of slices
  4. I added some paranoid protections against a race condition that probably didn't exist, but I was going crazy with inconsistent results and this seemed like a good idea at the time.Might want to revisit this and remove some unneeded code in MaxScoreTerminator.

Here's a better description of what this PR is:

Performs early termination in TopFieldCollector (when query sort matches index sort) based on the minimum minimum score across all leaf collectos, providing nice gains for queries with large N. Once we have globally collected the number of hits required to satisfy the query (which is max(N, 1000)) then the minimum minimum score across threads is a global lower bound on the score that must be met by any hit. We already have sufficient hits to satisfy the query with score better than that, so any later hit with a worse score will be discarded, and any Collector retrieving such a hit can be terminated. Note that this is a looser bound than the maximum minimum computed in LUCENE-8978, but that bound can only be applied once a collector has collected N hits and all collectors together have collected 1000. When N is relatively large, this weaker bound often applies before N hits have been collected. Further, once a collector terminates, the same logic applies to the remaining collectors, which can result in raising the bound further. More precisely, the termination bound used is the minimum minimum score among the top collectors (ranked by their minimum scores) that together have at least max(N, 1000) hits.

N=1000

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 216.94 (0.3%) 549.10 (15.9%) 153.1% ( 136% - 169%)
HighTermDayOfYearSort 277.36 (0.9%) 1107.12 (25.9%) 299.2% ( 269% - 328%)

h2. N=500

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 385.47 (5.4%) 649.01 (4.9%) 68.4% ( 55% - 83%)
HighTermDayOfYearSort 499.03 (6.5%) 1133.98 (27.5%) 127.2% ( 87% - 172%)

h2. N=100

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 578.19 (1.4%) 589.86 (2.0%) 2.0% ( -1% - 5%)
HighTermDayOfYearSort 1444.25 (1.8%) 1508.98 (9.6%) 4.5% ( -6% - 16%)

h2. N=20

Task QPS before StdDev QPS after StdDev Pct diff
HighTermDayOfYearSort 1857.14 (2.4%) 1766.53 (2.0%) -4.9% ( -9% - 0%)
LowTermDayOfYearSort 628.71 (2.3%) 620.24 (1.6%) -1.3% ( -5% - 2%)

 

[Legacy Jira: Michael Sokolov (@msokolov) on Mar 04 2020]

mikemccand commented 4 years ago

I posted a new revision that switches between max/min-based termination (what we had before this), and this min/min? termination for higher N (numHits) and we now get uniformly better, or the the same, results on benchmarks. Actually I find the "max/min" terminology pretty confusing since in fact sorting is generally increasing so we are really interesting in min/max and max/max, so I tried to use "worst" score in most places to avoid this confusion. Anyway here are the updated results:

N=20

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 610.73 (1.5%) 609.69 (1.0%) -0.2% ( -2% - 2%)

| HighTermDayOfYearSort| 1791.55 |(2.1%)| 1814.44 |(3.0%)| 1.3% ( -3% - 6%)|

N=100

Task QPS before StdDev QPS after StdDev Pct diff

|LowTermDayOfYearSort | 568.79| (2.2%) | 588.81| (0.5%) |3.5% ( 0% - 6%)| |HighTermDayOfYearSort| 1431.30| (12.4%)| 1664.18| (9.6%)| 16.3% ( -5% - 43%)|

N=500

Task QPS before StdDev QPS after StdDev Pct diff

|LowTermDayOfYearSort| 386.90 |(5.0%) |585.41| (6.0%) |51.3% ( 38% - 65%)| |HighTermDayOfYearSort | 482.69 |(7.7%) |1017.13| (30.5%) | 110.7% ( 67% - 161%)|

N=1000

Task QPS before StdDev QPS after StdDev Pct diff
LowTermDayOfYearSort 243.90 (3.1%) 547.16 (12.1%) 124.3% ( 105% - 144%)

| HighTermDayOfYearSort| 272.67| (3.4%)| 1041.77 | (33.4%)| 282.1% ( 237% - 330%)|

[Legacy Jira: Michael Sokolov (@msokolov) on Mar 08 2020]

mikemccand commented 4 years ago

Interesting results @msokolov  and thanks for cleaning the TopFieldCollector.

Regarding the challenge, I wonder if DAY_OF_YEAR is a good candidate. Considering the cardinality of the field, it could be more efficient to sort the leaves based on their max values and number of documents before forking new threads ? This is not the case here, but for time-based data where the order of segments follow the natural order of insertion, sorting by segments prior to search can improve the performance dramatically even for small top N. This is something we added in Elasticsearch to boost the performance of queries sorted by timestamp on time-based indices: https://github.com/elastic/elasticsearch/pull/44021

For sorted queries in general, I think it could be interesting to differentiate requests that don't require to follow the natural order of segments. This is true for concurrent requests but this shouldn't be limited to this case. Today we try to share a global state between leaves so that concurrent and sequential request can early terminate efficiently. We also handle sorted indices and queries sorted by relevancy and a tiebreaker, all of that in the same TopFieldCollector. I know you already made some cleanup but it is maybe a time to have a clear split ? Optimizing queries on sorted indices for large top N could be enhanced further if we add a special top field collector for this purpose. You could for instance remove the leaf priority queue entirely since results are already sorted ?

I am also not sure that we're comparing the same thing in the benchmark. If I understand the last pr correctly, leaves are terminated as soon as they've reached the global lower bound so they don't tiebreak ties on doc ids. Not sure if that makes a big difference or not in terms of performance but that would at least make the top N non-deterministic so that's a problem.

I am supportive of any improvements we want to make on sorted queries but we should also keep the TopFieldCollector simple.

Another idea that we discussed with Adrien would be to give the ability to skip documents in the LeafFieldComparator. This is similar in spirit than what we have in queries with setMinCompetitiveScore:

public interface LeafFieldComparator {
   void setBottom(final int slot) throws IOException;

   ...

   default DocIdSetIterator iterator() {
     return null;
   }
}

 If the returned iterator is used in conjunction with the query, it should be possible to stop/modify the remaining collection when setBottom is called by the top collector. With this mechanism in place it could be much simpler implement the optimization we added in  Elasticsearch in: https://github.com/elastic/elasticsearch/pull/49732. I am not sure if this would be usable for the optimization you want but I wanted to share this idea since it could have the same impact on sorted queries in Lucene than the block-max WAND have on queries sorted by score.

 

 

[Legacy Jira: Jim Ferenczi (@jimczi) on Mar 12 2020]

mikemccand commented 4 years ago

Thanks for the insightful comments, @jimczi, you've given me a lot to think about! I had not really considered sorting segments: that makes a lot of sense when documents are at least roughly inserted in sort order. I would have thought merges would interfere with that opto, but I guess for the most part it works out? The performance improvements you saw are stunning. It would be great if we could get the segment sorting ideas merged into the Lucene code base, no? I wonder how we determine when they are applicable though. In Elasticsearch is it done based on some a-priori knowledge, or do you analyze the distribution and turn on the opto automatically? That would be compelling I think. On the other hand, the use case inspiring this does not tend to correlate index sort order and insertion order, so I don't think it would benefit as much from segment sorting (except due to chance, or in special cases), so I think these are really two separate optimizations and issues. We should be sure to structure the code in such a way that can accomodate them all and properly choose which one to apply. We don't have a formal query planner in Lucene, but I guess we are beginning to evolve one.

I think the idea of splitting collectors is a good one, to avoid overmuch complexity in a single collector, but there is also a good deal of shared code across these. I can give that a try and see what it looks like.

By the way, I did also run a test using luceneutil's "modification timestamp" field as the index sort and saw similar gains. I think that field is more tightly correlated with insertion order, and also has much higher cardinality, so it makes a good counterpoint: I'll post results here later once I can do a workup.

I hear your concern about the non-determinism due to tie-breaking, but I * think* this is accounted for by including (global) docid in the comparison in MaxScoreTerminator.LeafState? I may be missing something though. It doesn't seem we have a good unit test checking for this tiebreak. I'll add to TestTopFieldCollector.testRandomMaxScoreTermination to make sure that case is covered.

I'm not sure what to say about the LeafFieldComparator idea - it sounds powerful, but I am also a bit leery of these complex Comparators - they make other things more difficult since it becomes challenging to reason about the sort order "from the outside". I had to resort to some "instanceof" hackery to restrict consideration to cases where the comparator is numeric, and extracting the sort value from the comparator is pretty messy too. We pay a complexity cost here to handle some edge cases of more abstract comparators.

[Legacy Jira: Michael Sokolov (@msokolov) on Mar 12 2020]

mikemccand commented 4 years ago

Thanks @msokolov, sorry for the unstructured answer but we touch a lot of interesting topics here ;)

 By the way, I did also run a test using luceneutil's "modification timestamp" field as the index sort and saw similar gains. I think that field is more tightly correlated with insertion order, and also has much higher cardinality, so it makes a good counterpoint: I'll post results here later once I can do a workup.

 I think this is expected since you can "early terminate" running segments with the results of other running ones. Although my point was more that you could achieve similar (even superior) gain if you just sort the segments beforehand. It could even be not needed to search the segments concurrently if the insertion order matches the sort order. In such a sorted sequential search over segments can be more competitive. 

 

  I would have thought merges would interfere with that opto, but I guess for the most part it works out?

The natural order is preserved in tiered merge policy as well so I don't think it's an issue.

 

 I'm not sure what to say about the LeafFieldComparator idea - it sounds powerful, but I am also a bit leery of these complex Comparators - they make other things more difficult since it becomes challenging to reason about the sort order "from the outside". I had to resort to some "instanceof" hackery to restrict consideration to cases where the comparator is numeric, and extracting the sort value from the comparator is pretty messy too. We pay a complexity cost here to handle some edge cases of more abstract comparators.

 

Yes, the main intent is not to handle concurrent requests. The best example here is a LongSortField that can skip documents during the collections by comparing the current bottom value with the values indexed in the BKD-tree. It would be easier to implement such optimizations directly in the FieldComparator since it's an abstraction of a queue.

 

 I hear your concern about the non-determinism due to tie-breaking, but I * think* this is accounted for by including (global) docid in the comparison in MaxScoreTerminator.LeafState? I may be missing something though. It doesn't seem we have a good unit test checking for this tiebreak. I'll add to TestTopFieldCollector.testRandomMaxScoreTermination to make sure that case is covered

 

Sorry it's just me that looked too quickly. This makes me wonder where the gains are coming from ? If we use all available threads from the beginning with no-ordering we're also "wasting" a lot of cpus. Having different strategies to sort the leaves and to set the number of threads could be dependent on the data and top N size for instance. In the case where the leaves don't intersect, it is preferable to run fewer segments at a time since we're expecting to skip the followers more efficiently if we start from the global min value.

 

[Legacy Jira: Jim Ferenczi (@jimczi) on Mar 12 2020]

mikemccand commented 4 years ago

. Having different strategies to sort the leaves and to set the number of threads could be dependent on the data and top N size for instance.

Agreed, different strategies win in different situations. In our case, insertion order is not usually correlated with index sort order, although maybe we can have some influence there and use the segment-sorting idea + single-threaded search, however when indexing is in more-or-less random order, I think this approach gets you better latency at some cost to throughput.

This makes me wonder where the gains are coming from ?

When it is successful, we collect only the hits we need plus a small buffer from each segment, and we do them in parallel, so latency is reduced.

Latest patch here has the side effect of cleaning up the branching logic in IndexSearcher where we choose a strategy, but I realized this is not available anywhere else we also use these collectors, nor easily to users specializing IndexSearcher's behavior. If we get enough strategies here we may want to abstract into a query planner at some point.

Updated perf stats from the patch:

N=10

Task QPS before StdDev QPS after StdDev Pct diff
LowTermLastModSort 2205.95 (7.0%) 2181.02 (2.5%) -1.1% ( -9% - 8%)
HighTermLastModSort 1167.99 (1.2%) 1274.97 (1.2%) 9.2% ( 6% - 11%)
HighTermDayOfYearSort 1972.26 (0.8%) 1987.16 (0.7%) 0.8% ( 0% - 2%)
LowTermDayOfYearSort 649.64 (0.7%) 669.05 (2.5%) 3.0% ( 0% - 6%)

h3. N=20

Task QPS before StdDev QPS after StdDev Pct diff
LowTermLastModSort 2239.97 (8.2%) 2288.43 (7.8%) 2.2% ( -12% - 19%)
HighTermLastModSort 1097.37 (1.0%) 1190.45 (1.4%) 8.5% ( 5% - 11%)

h3. N=100

Task QPS before StdDev QPS after StdDev Pct diff
HighTermLastModSort 847.34 (1.2%) 978.83 (1.6%) 15.5% ( 12% - 18%)
LowTermLastModSort 1540.43 (4.0%) 1807.39 (3.9%) 17.3% ( 9% - 26%)
LowTermDayOfYearSort 621.57 (2.7%) 656.28 (1.9%) 5.6% ( 0% - 10%)
HighTermDayOfYearSort 1497.62 (1.0%) 1617.08 (0.9%) 8.0% ( 6% - 9%)

h3. N=500

Task QPS before StdDev QPS after StdDev Pct diff
HighTermLastModSort 409.67 (3.7%) 825.41 (2.0%) 101.5% ( 92% - 111%)
LowTermLastModSort 555.95 (7.5%) 1412.98 (12.6%) 154.2% ( 124% - 188%)
LowTermDayOfYearSort 417.82 (3.5%) 653.06 (10.9%) 56.3% ( 40% - 73%)
HighTermDayOfYearSort 498.74 (5.8%) 1044.63 (22.4%) 109.5% ( 76% - 146%)

Note: in all cases the tests were done with the index sorted according to the search criterion (these show tests against two different indexes)

[Legacy Jira: Michael Sokolov (@msokolov) on Mar 16 2020]