Closed asfimport closed 1 week ago
Adrien Grand (@jpountz) (migrated from JIRA)
The best way to address such issues is on top of Lucene by having multiple shards whose results can be merged with TopDocs#merge.
Parallelizing based on ranges of doc IDs is problematic for some queries, for instance the cost of evaluating a range query over an entire segment or only about a specific range of doc IDs is exactly the same given that it uses data-structures that are organized by value rather than by doc ID.
Atri Sharma (@atris) (migrated from JIRA)
Thanks for the comments.
Having a multi shard approach makes sense, but a search is still bottlenecked by the largest segment it needs to scan. If there are many segments of that type, that might become a problem.
While I agree that range queries might not be directly benefited from parallel scans, but other queries (such as TermQueries) might be benefitted from a segment parallel scan. In a typical ElasticSearch interactive query, we see spikes when a large segment is hit for an interactive use case. Such cases can be optimized with parallel scans.
We should have a method of deciding whether a scan should be parallelized or not, and then let the execution operator get a set of nodes to execute. That is probably outside the scope of this JIRA, but I wanted to open this thread to get the conversation going.
Michael McCandless (@mikemccand) (migrated from JIRA)
I think it'd be interesting to explore intra-segment parallelism, but I agree w/ @jpountz that there are challenges :)
If you pass an ExecutorService
to IndexSearcher
today you can already use multiple threads to answer one query, but the concurrency is tied to your segment geometry and annoyingly a supposedly "optimized" index gets no concurrency ;)
But if you do have many segments, this can give a nice reduction to query latencies when QPS is well below the searcher's red-line capacity (probably at the expense of some hopefully small loss of red-line throughput because of the added overhead of thread scheduling). For certain use cases (large index, low typical query rate) this is a powerful approach.
It's true that one can also divide an index into more shards and run each shard concurrently but then you are also multiplying the fixed query setup cost which in some cases can be relatively significant.
Parallelizing based on ranges of doc IDs is problematic for some queries, for instance the cost of evaluating a range query over an entire segment or only about a specific range of doc IDs is exactly the same given that it uses data-structures that are organized by value rather than by doc ID.
Yeah that's a real problem – these queries traverse the BKD tree per-segment while creating the scorer, which is/can be the costly part, and then produce a bit set which is very fast to iterate over. This phase is not separately visible to the caller, unlike e.g. rewrite that MultiTermQueries use to translate into simpler queries, so it'd be tricky to build intra-segment concurrency on top ...
Adrien Grand (@jpountz) (migrated from JIRA)
If some segments are getting large enough that intra-segment parallelism becomes appealing, then maybe an easier and more efficient way to increase parallelism is to instead reduce the maximum segment size so that inter-segment parallelism has more potential for parallelizing query execution.
Atri Sharma (@atris) (migrated from JIRA)
If some segments are getting large enough that intra-segment parallelism becomes appealing, then maybe an easier and more efficient way to increase parallelism is to instead reduce the maximum segment size so that inter-segment parallelism has more potential for parallelizing query execution.
Would that not lead to a much higher number of segments than required? That could lead to issues such as a lot of open file handles and too many threads required for scanning (although we would assign multiple small segments to a single thread).
Thanks for the point about range queries, that is an important thought. I will follow up with a separate patch on top of this which will do the first phase of BKD iteration and share the generated bitset across N parallel threads, where N is equal to the remaining clauses and each thread intersects a clause with the bitset.
Michael McCandless (@mikemccand) (migrated from JIRA)
If some segments are getting large enough that intra-segment parallelism becomes appealing, then maybe an easier and more efficient way to increase parallelism is to instead reduce the maximum segment size so that inter-segment parallelism has more potential for parallelizing query execution.
Yeah that is a good workaround given how Lucene works today.
It's essentially the same as your original suggestion ("make more shards and search them concurrently"), just at the segment instead of shard level.
But this still adds some costs – the per-segment fixed cost for each query. That cost should be less than the per shard fixed cost in the sharded case, but it's still adding some cost.
If instead Lucene had a way to divide large segments into multiple work units (and I agree there are challenges with that! – not just BKD and multi-term queries, but e.g. how would early termination work?) then we could pay that per-segment fixed cost once for such segments then let multiple threads share the variable cost work of finding and ranking hits.
In our recently launched production index we see sizable jumps in the P99+ query latencies when a large segment merges finish and replicate, because we are using "thread per segment" concurrency that we are hoping we could improve by pushing thread concurrency into individual large segments.
Atri Sharma (@atris) (migrated from JIRA)
Here are the results of luceneutil (patched to generate P50 and P90 and to run concurrent searching within IndexSearcher. Patch is posted to luceneutil repo).
Adrien has a valid point about costly scorers not benefitting from this approach. Specifically, range queries can take a hit since BKD Tree's scorer is two phase and is expensive to construct, so doing them per portion of a segment would lead to increase in latency, as is evident from the increase in P90 latency in the above results. I am spending time to evaluate how to tackle this problem and will post any thoughts that I see as viable. These benchmarks are targeted to measure the changes in the "happy" path i.e. the targeted big index sizes and low QPS cases. Luceneutil was configured accordingly (low number of search threads, impacts turned off)
In summary, the queries scanning a higher amount of data and having higher read latencies tend to have the maximum improvement. Term queries and queries involving term queries on higher frequency terms get a reasonable latency reduction.
The following are P50 and P90 latencies calculated by Luceneutil. P50 Base is the P50 latency of the base, P50 Cmp is the P50 latency of the competitor (patched version), and the same for P90.
Note: The QPS jumps are not real. Since Luceneutil was congigured to run a single searcher thread, QPS jump is proportional to the latency drop for task.
Luceneutil results: https://gist.github.com/atris/9a06d511fdfa9de1b48b47e09d5ab8d2
I have attached the P50 and P90 latency graphs for high frequency phrase and term queries. It is apparent that queries with high frequency terms have sizeable improvements.
To address Adrien's point, I have some ideas to improve performance of BKD tree scorer for this case, will open a separate JIRA issue and link here.
@jpountz Are there any other concerns that you see here? Happy to address your feedback.
Atri Sharma (@atris) (migrated from JIRA)
Repeating the earlier results in a more human readable form
Task ('Wildcard', None) | P50 Base 9.993697 | P50 Cmp 11.906981 | Pct 19.1449070349 | P90 Base 14.431318 | P90 Cmp 13.953923 | Pct -3.3080485095 |
---|---|---|---|---|---|---|
Task ('HighTermDayOfYearSort', 'DayOfYear') | P50 Base 39.556908 | P50 Cmp 44.389095 | Pct 12.2157854198 | P90 Base 62.421873 | P90 Cmp 49.214184 | Pct -21.1587515165 |
Task ('AndHighHigh', None) | P50 Base 3.814074 | P50 Cmp 2.459326 | Pct -35.5197093711 | P90 Base 5.045984 | P90 Cmp 7.932029 | Pct 57.1948900353 |
Task ('OrHighHigh', None) | P50 Base 9.586193 | P50 Cmp 5.846643 | Pct -39.0097507947 | P90 Base 14.978843 | P90 Cmp 7.078967 | Pct -52.7402283341 |
Task ('MedPhrase', None) | P50 Base 3.210464 | P50 Cmp 2.276356 | Pct -29.0957319565 | P90 Base 4.217049 | P90 Cmp 3.852337 | Pct -8.64851226533 |
Task ('LowSpanNear', None) | P50 Base 11.247447 | P50 Cmp 4.986828 | Pct -55.6625783611 | P90 Base 16.095342 | P90 Cmp 6.121194 | Pct -61.9691585305 |
Task ('Fuzzy2', None) | P50 Base 23.636902 | P50 Cmp 20.959304 | Pct -11.3280412128 | P90 Base 112.5086 | P90 Cmp 105.188025 | Pct -6.50668037821 |
Task ('OrNotHighHigh', None) | P50 Base 4.225917 | P50 Cmp 2.62127 | Pct -37.9715692476 | P90 Base 6.11225 | P90 Cmp 8.525249 | Pct 39.4780809031 |
Task ('OrHighNotLow', None) | P50 Base 4.015982 | P50 Cmp 2.250697 | Pct -43.956496817 | P90 Base 10.636566 | P90 Cmp 3.134868 | Pct -70.5274427856 |
Task ('BrowseMonthSSDVFacets', None) | P50 Base 66.920633 | P50 Cmp 66.986841 | Pct 0.0989351072038 | P90 Base 67.230757 | P90 Cmp 76.011531 | Pct 13.0606502021 |
Task ('Fuzzy1', None) | P50 Base 14.779783 | P50 Cmp 12.559705 | Pct -15.0210459788 | P90 Base 46.329521 | P90 Cmp 218.272906 | Pct 371.131367838 |
Task ('HighSloppyPhrase', None) | P50 Base 21.362967 | P50 Cmp 10.563982 | Pct -50.5500242546 | P90 Base 33.009649 | P90 Cmp 15.74507 | Pct -52.3016133858 |
Task ('OrNotHighMed', None) | P50 Base 2.032775 | P50 Cmp 1.584332 | Pct -22.0606314029 | P90 Base 2.529475 | P90 Cmp 2.044107 | Pct -19.1884877297 |
Task ('LowPhrase', None) | P50 Base 4.937747 | P50 Cmp 2.8876 | Pct -41.5198875115 | P90 Base 6.910574 | P90 Cmp 5.159077 | Pct -25.345173932 |
Task ('AndHighLow', None) | P50 Base 1.097696 | P50 Cmp 1.416176 | Pct 29.0134973617 | P90 Base 3.426081 | P90 Cmp 13.987273 | Pct 308.258678064 |
Task ('LowTerm', None) | P50 Base 0.787595 | P50 Cmp 1.038949 | Pct 31.9141182968 | P90 Base 1.12006 | P90 Cmp 39.639455 | Pct 3439.04746174 |
Task ('BrowseDayOfYearSSDVFacets', None) | P50 Base 80.006624 | P50 Cmp 80.215023 | Pct 0.260477182489 | P90 Base 80.610476 | P90 Cmp 81.187614 | Pct 0.71595905227 |
Task ('Prefix3', None) | P50 Base 3.347358 | P50 Cmp 3.219213 | Pct -3.82824305019 | P90 Base 6.716371 | P90 Cmp 5.21174 | Pct -22.4024402464 |
Task ('HighTermMonthSort', 'Month') | P50 Base 20.684075 | P50 Cmp 19.601521 | Pct -5.23375592092 | P90 Base 21.341383 | P90 Cmp 20.092673 | Pct -5.85112033274 |
Task ('HighTerm', None) | P50 Base 2.991271 | P50 Cmp 1.891199 | Pct -36.7760727798 | P90 Base 4.058212 | P90 Cmp 2.320309 | Pct -42.8243522024 |
Task Respell | P50 Base 17.33154 | P50 Cmp 17.397468 | Pct 0.38039320222 | P90 Base 99.071728 | P90 Cmp 66.75552 | Pct -32.6190010535 |
Task ('MedTerm', None) | P50 Base 3.011125 | P50 Cmp 1.793175 | Pct -40.4483374154 | P90 Base 4.206761 | P90 Cmp 2.392798 | Pct -43.1201820118 |
Task ('MedSloppyPhrase', None) | P50 Base 5.896878 | P50 Cmp 3.304889 | Pct -43.9552759952 | P90 Base 8.044708 | P90 Cmp 4.881775 | Pct -39.316939782 |
Task ('HighSpanNear', None) | P50 Base 20.981466 | P50 Cmp 9.533211 | Pct -54.5636563241 | P90 Base 28.98951 | P90 Cmp 11.087743 | Pct -61.7525684291 |
Task ('LowSloppyPhrase', None) | P50 Base 12.841091 | P50 Cmp 6.075233 | Pct -52.6891211969 | P90 Base 18.539534 | P90 Cmp 6.825001 | Pct -63.1867715769 |
Task ('OrHighNotHigh', None) | P50 Base 11.822146 | P50 Cmp 6.645646 | Pct -43.786466518 | P90 Base 17.02398 | P90 Cmp 7.935497 | Pct -53.3863585366 |
Task ('OrNotHighLow', None) | P50 Base 0.782455 | P50 Cmp 1.06583 | Pct 36.2161402253 | P90 Base 1.668578 | P90 Cmp 13.200645 | Pct 691.131430476 |
Task ('MedSpanNear', None) | P50 Base 3.161032 | P50 Cmp 2.154472 | Pct -31.8427652741 | P90 Base 5.386012 | P90 Cmp 5.665401 | Pct 5.18730741781 |
Task ('BrowseDateTaxoFacets', None) | P50 Base 444.971146 | P50 Cmp 444.674024 | Pct -0.066773318376 | P90 Base 447.81169 | P90 Cmp 445.950713 | Pct -0.415571330887 |
Task ('HighPhrase', None) | P50 Base 7.464241 | P50 Cmp 4.644244 | Pct -37.7800904338 | P90 Base 25.153245 | P90 Cmp 7.548758 | Pct -69.9889298578 |
Task ('OrHighLow', None) | P50 Base 6.344855 | P50 Cmp 3.590218 | Pct -43.4152868742 | P90 Base 8.425453 | P90 Cmp 15.578677 | Pct 84.9001709463 |
Task ('BrowseDayOfYearTaxoFacets', None) | P50 Base 0.16655 | P50 Cmp 0.184125 | Pct 10.5523866707 | P90 Base 0.207908 | P90 Cmp 0.224575 | Pct 8.01652654059 |
Task ('IntNRQ', None) | P50 Base 24.844282 | P50 Cmp 12.870238 | Pct -48.196377742 | P90 Base 45.815197 | P90 Cmp 57.190359 | Pct 24.8283598999 |
Task ('BrowseMonthTaxoFacets', None) | P50 Base 0.16488 | P50 Cmp 0.170045 | Pct 3.13258127123 | P90 Base 0.203625 | P90 Cmp 0.200508 | Pct -1.53075506446 |
Task ('AndHighMed', None) | P50 Base 2.109471 | P50 Cmp 1.773399 | Pct -15.9315771584 | P90 Base 2.458244 | P90 Cmp 3.943119 | Pct 60.4038899312 |
Task ('OrHighNotMed', None) | P50 Base 3.580582 | P50 Cmp 3.088177 | Pct -13.7520939333 | P90 Base 4.196391 | P90 Cmp 4.16434 | Pct -0.763775348865 |
Task PKLookup | P50 Base 9.248977 | P50 Cmp 9.76835 | Pct 5.61546428324 | P90 Base 47.86882 | P90 Cmp 10.705417 | Pct -77.6359287737 |
Task ('OrHighMed', None) | P50 Base 9.072955 | P50 Cmp 5.552202 | Pct -38.8049207783 | P90 Base 20.823925 | P90 Cmp 7.961727 | Pct -61.7664441262 |
Adrien Grand (@jpountz) (migrated from JIRA)
I wonder if we could avoid paying the cost of Scorer/BulkScorer initialization multiple times by implementing Cloneable on these classes, similarly to how we use cloning on IndexInputs to consume them from multiple threads. It would require implementing Cloneable on a few other classes, e.g. PostingsEnum, and maybe we'd need to set some restrictions to keep this feature reasonable, e.g. it's only legal to clone when the current doc ID is -1. But this could help parallelize collecting a single segment by assigning each clone its own range of doc IDs.
A downside of this approach is that it wouldn't help parallelize the initialization of Scorers, but I don't know if there is a way around it.
Michael McCandless (@mikemccand) (migrated from JIRA)
I wonder if we could avoid paying the cost of Scorer/BulkScorer initialization multiple times by implementing Cloneable on these classes, similarly to how we use cloning on IndexInputs to consume them from multiple threads.
+1
Thanks @jpountz for bringing this up as a feature for Lucene 10. It would be great to have if we can build it!
A few of us in Amazon Product Search (+cc @stefanvodita , @slow-J) had been looking into it out of curiosity. We discussed implementing Cloneable for Scorer/BulkScorer as previously suggested, but it looks like even implementing it just for the TermQuery scorer requires significant code changes, as there are quite a few dependencies that have to be Cloneable too. Maybe there is a hybrid approach? For example, when concurrent segment search is being initialized, it can try calling clone() for Scorer/BulkScorer, but if it throws CloneNotSupportedException, we fall back to creating a new Scorer/BulkScorer instance? Then we can implement the clone method only for scorers that are expensive to initialize, e.g. range query scorers mentioned in the comments above.
This approach still takes a lot of up-front effort. I’m curious if anyone has better ideas. The original patch seems to have been lost; it would have been really useful to have. Tagging @atris on the off-chance that he still has the patch or remembers what the implementation looked like.
Maybe there is a hybrid approach? For example, when concurrent segment search is being initialized, it can try calling clone() for Scorer/BulkScorer, but if it throws CloneNotSupportedException, we fall back to creating a new Scorer/BulkScorer instance?
My gut feeling is that it would work internally like that, so that we would not have to migrate all queries in one go. But hopefully on the caller side, there would be a single API to call.
Instead of the clone() approach, I wonder if we could also allow ScorerSupplier#get
to be called multiple times, and document that these scorers may be used in different threads. (We'd probably need to add ScorerSupplier#getBulkScorer
as well.)
I started to play around with a hacky implementation that:
LeafReaderContext
I don't claim this is a good API design, but it mostly just works and I just wanted to do the least work possible to enable this using (mostly) existing APIs to see what the impact would be on our application. I found the initial implementation in Lucene to be pretty straightforward, but in our application it seems we tie a lot of things to reader leaves that now have to be tied to searcher leaves, and these tend to be stored in arrays based on the number of leaves which now would not be viable - I expect an IndexSearcher might want to dynamically vary the way it slices an index? I guess the main learning I have from that is that we do want to have an explicit API change reflecting this - otherwise it is easy to fall into traps where everything compiles and seems fine, but things are actually quite broken - eg formerly you expected only a single thread to be accessing your per-leaf data structures, but now multiple ones can. Also @stefanvodita pointed me to Luca's branch https://github.com/javanna/lucene/commit/6ca5680c94452f02d38bd0a5ba881da9ae950bae which seems to have a somewhat cleaner API.
but I guess as an execution strategy it kind of made sense to me -- is it really necessary to clone Scorers? Could we create new ones for each searcher-segment or do we think that is too costly due to initialization costs?
There is another issue associated with this that I only realized when testing. When there are parent/child queries we need to take care not to split document blocks across leaves. I'm not quite sure how to approach this. Does IndexSearcher have access to a map of where doc block boundaries lie? And even if it does, taking those arbitrary bounds into account can be quite challenging in terms of selecting how to chop the index into intervals.
I guess we can use the new "parent field" to enforce that intervals must end on a non-child document, and we have to refuse to attempt query execution over sub-leaf interval if there are doc blocks and no parent field, at least if the query is block-sensitive. I guess we will be enforcing that in Lucene 10. At first I thought we were not, but then realized I was testing with 9x!
but I guess as an execution strategy it kind of made sense to me -- is it really necessary to clone Scorers? Could we create new ones for each searcher-segment or do we think that is too costly due to initialization costs?
It depends on queries. For term queries, duplicating the overhead of looking up terms in the terms dict may be ok, but for multi-term queries and point queries that often compute the bit set of matches of the whole segment, this could significantly hurt throughput. Maybe it doesn't have to be this way for the first iteration (progress over perfection), but this feels important to me so that we don't have weird recommendations like "only enable intra-segment concurrency if you don't use multi-term or point queries".
Related: in the long term, I'd like inter-segment search concurrency to be enabled by default (#11523 maybe something else we should consider for 10.0), and ideally intra-segment search concurrency too, which is another reason why I care about avoiding introducing major downsides vs. single-threaded search.
One thing came up during my testing / messing around that I think could significantly affect the API we provide which is whether we want to bake in the algorithm for computing leaves/slices into the IndexSearcher
as we do today, or whether it should be part of the search()
API or some other top-level API. The reason I think we might want to expose the slicing/interval calculation is that it seems possible some Query's might be more-or-less amenable to being scored in this way - eg if they have specific interactions with Scorers ... in particular I'm having difficulty with DrillSidewaysQuery. Another consideration is that we may want to modify the amount of concurrency we apply to each Query in response to operational conditions. If we're running hot we might want to use less concurrency per Query. I guess one alternative is to maintain multiple IndexSearchers with different characteristics, but this can become burdensome and is also less flexible - do you have an IndexSearcher for every possible number of threads you might want to apply to a single query?
we may want to modify the amount of concurrency we apply to each Query in response to operational conditions
I would really like it if we could do this. It could be a very tight and effective feedback loop. With this change, it's natural to think first about the latency improvement from added concurrency (fewer segments, more threads), but there are situations where the flexibility can allow us to improve throughput instead (more segments, fewer threads).
I guess one alternative is to maintain multiple IndexSearchers with different characteristics
Since IndexSearcher is very cheap to create, you could create a new IndexSearcher
for every search? This is something you need to do today if you are using timeouts anyway since you only want the timeout to apply to the search you're about to run.
I don't know our IndexSearcher looks a little heavy; I think some of that is our own doing and we could tease it apart, but isn't EG the query cache tied to the IndexSearcher? And we have some other kinds of caches like a TermContextCache. I guess we could find a way to persist these across multiple searchers ... maybe. Then there is this SearcherTaxonomyManager, which is how we get access to the searcher - I don't know if it can manage multiple searchers. I'd really like to see if we can provide some API-level controls on this outside the constructor
jpountz said: It depends on queries. For term queries, duplicating the overhead of looking up terms in the terms dict may be ok, but for multi-term queries and point queries that often compute the bit set of matches of the whole segment, this could significantly hurt throughput. Maybe it doesn't have to be this way for the first iteration (progress over perfection), but this feels important to me so that we don't have weird recommendations like "only enable intra-segment concurrency if you don't use multi-term or point queries".
I was thinking a bit about intra-segment concurrency this morning and got thinking specifically about multi-term, point, and vector queries that do most of their heavy-lifting up front (to the point where I've seen a bunch of profiles where relatively little time is spent actually iterating through DISIs).
Those queries (or at least their ScorerSuppliers) "know" when they're going to be expensive, so it feels like they're in the best position to say "I should be parallelized". What if ScorerSupplier could take a reference to the IndexSearcher's executor and return a CompletableFuture for the Scorer? Something like TermQuery could return a "completed" future, while "expensive" scorers could be computed on another thread. It could be a quick and easy way to parallelize some of the per-segment computation.
jpountz said: It depends on queries. For term queries, duplicating the overhead of looking up terms in the terms dict may be ok, but for multi-term queries and point queries that often compute the bit set of matches of the whole segment, this could significantly hurt throughput. Maybe it doesn't have to be this way for the first iteration (progress over perfection), but this feels important to me so that we don't have weird recommendations like "only enable intra-segment concurrency if you don't use multi-term or point queries".
I was thinking a bit about intra-segment concurrency this morning and got thinking specifically about multi-term, point, and vector queries that do most of their heavy-lifting up front (to the point where I've seen a bunch of profiles where relatively little time is spent actually iterating through DISIs).
Those queries (or at least their ScorerSuppliers) "know" when they're going to be expensive, so it feels like they're in the best position to say "I should be parallelized". What if ScorerSupplier could take a reference to the IndexSearcher's executor and return a CompletableFuture for the Scorer? Something like TermQuery could return a "completed" future, while "expensive" scorers could be computed on another thread. It could be a quick and easy way to parallelize some of the per-segment computation.
To add on to this, I was wondering if we could further extend the concurrent logic within a query. For example, in range queries today we traverse the BKD over the whole range. What if we could split the range and give them to an executor to intersect the range? Then we could construct the DISI through multiple threads.
Similarly in a terms query, we could get each term to parallely create their BitSets/Iterators and then conjunction/disjunctions over them can happen all at once.
I'd really like to keep intra-segment parallelism simple and stick to splitting the doc ID space, which is the most natural approach for queries that produce good iterators like term queries without a huge up-front cost.
I opened an initial draft of my take at intra segment concurrency (#13542) . It needs quite a bit of work and discussion, but I hope it helps as start, hopefully getting intra-segment concurrency into Lucene 10, one can always hope :)
Segment search is a single threaded operation today, which can be a bottleneck for large analytical queries which index a lot of data and have complex queries which touch multiple segments (imagine a composite query with range query and filters on top). This ticket is for discussing the idea of splitting a single segment into multiple threads based on mutually exclusive document ID ranges.
This will be a two phase effort, the first phase targeting queries returning all matching documents (collectors not terminating early). The second phase patch will introduce staged execution and will build on top of this patch.
Migrated from LUCENE-8675 by Atri Sharma (@atris), 1 vote, updated Aug 04 2022 Attachments: PhraseHighFreqP50.png, PhraseHighFreqP90.png, TermHighFreqP50.png, TermHighFreqP90.png