Open jed326 opened 1 year ago
Some more background:
With #8306 we will properly supporting the timeout
and cancel_after_time_interval
search parameters. This goal of this issue is to properly support the terminate_after
workflow.
In the non-concurrent case today, if the terminate_after
parameter is set to not default
then an EarlyTerminatingCollector
is used which will throw an EarlyTerminationException
once the specified number of docs have been collected.
In the concurrent search case, we will not throw the EarlyTerminationException
and instead throw CollectionTerminatedException
Which we can think of as a "soft" termination. collect
will still be called for all the doc and the search request will not immediately exit out due to the EarlyTerminationException
. In order to keep this behavior the same between the concurrent and non-concurrent cases, we will need a way to track the number of collected documents across all the threads and subsequently throw the EarlyTerminationException
once the sum of all docs collected reaches the specified doc count.
Some potential solutions
We can use an AtomicInteger
to synchronize the collected document count across all of the threads. However, numCollected
is updated on each LeafCollector::collect
call, so every single concurrent segment search thread will need to both read and update the document count at the same time.
We would need to do some benchmarking to see how this impacts performance, but intuitively this seems like not-insignificant overhead being introduced.
Continue to throw CollectionTerminatedException
, which will stop collecting docs after maxCountHits
is reached for a given collector. All collectors will collect docs until maxCountHits
in this scenario. This solution is what would happen if we do not make any changes.
Once one collector hits maxCountHits
, stop all other collectors even if they have not hit maxCountHits
then. This solution gets us some force termination behavior, however I don't think the behavior is very intuitive for the user.
terminate_after
option when concurrent segment search is being used.We could make concurrent segment search concretely not support forceTermination, at least for now, and revisit this at a later point as needed.
In terms of functionality I think solution 1 is best as it keeps the behavior the same between concurrent and non-concurrent search use cases. However, I am concerned about the potential performance hit doing such synchronization.
Tagging @reta @sohami @andrross for some feedback/thoughts. Thanks!
In terms of functionality I think solution 1 is best as it keeps the behavior the same between concurrent and non-concurrent search use cases. However, I am concerned about the potential performance hit doing such synchronization.
I agree with your analysis here. Any idea what the synchronization overhead might be?
I think option 4 is okay because it can be temporary and terminate_after
can be added later if users need it. I'm also not totally opposed to option 2 where we essentially add the caveat that in the concurrent case terminate_after
is per concurrent slice, not total count (I'm less sure this is a good idea). I think option 3 is my least preferred for the reason you cited.
There is potentially an option 1a where we don't keep an exact count (perhaps by updating the synchronized count every 10 or 100 or 1000 documents or whatever) in order to reduce the synchronization burden.
The additional synchronization has to be avoided at all costs - it kills the whole concurrent part of the search, however Apache Lucene does that using fe TopScoreDocCollector::createSharedManager
/ hitsThresholdChecker
, may we could leverage that.
Apache Lucene does that using fe
TopScoreDocCollector::createSharedManager
/hitsThresholdChecker
, may we could leverage that.
It looks like hitsThresholdChecker
is implemented with an AtomicLong
, which is basically the same as what I'm describing in solution 1: https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/HitsThresholdChecker.java#L26
I agree that synchronizing the doc count for each doc does kind of negate the concurrency part, but that would only be for the case where the terminate_after
parameter is used. If terminate_after
is not included in the request then we can skip the synchronization portions. I suppose I could do some benchmarking here to get a sense of the overhead here, but intuitively incrementing a global docs counter on each doc collected means we aren't collecting docs concurrently anymore so I don't think benchmarking results are really valuable at the moment.
That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.
That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.
I think it makes sense, once we see concurrent search being used widely (hopefully), we could get back to this issue
That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.
I think it makes sense, once we see concurrent search being used widely (hopefully), we could get back to this issue
The potential problem with solution 2 is that we would be codifying a behavior for terminate_after
and changing that behavior to solution 1 at a later point would potentially be a breaking change. Solution 4 (fail with an "unsupported" error for terminate_after
) is probably the better way to defer this work.
The potential problem with solution 2 is that we would be codifying a behavior for terminate_after and changing that behavior to solution 1 at a later point would potentially be a breaking change.
To be fair, I don't think there are any risks here, right now we just do more work BUT we soft-limit the results. The outcome of the "hard" (to be implemented in the future) and "soft" flows should be exactly the same from the user facing perspective.
@reta @jed326 There may be some change from user perspective. For example: if the terminate_after
is set to 100. Now each slice may hit < 100
docs however sum of the documents collected across slices may be > 100
. So in that case, we will still say that the request is not early terminated (as query result flag of early termination will not be set). Whereas with sequential flow, the flag will be set in the response. I guess @andrross is also pointing to same behavior as breaking change ? I think we will need to handle this with concurrent model, probably in reduce phase ?
The other cases, where a single collector hits the threshold is probably where user facing behavior will not change but in backend more work will be done and query may take longer to complete compared to sequential path. We can improve that later by using one of the option above but probably we will need to atleast document it that with concurrent path terminate_after
is considered for each slices
instead of at shard
level ?
@sohami Good callout! I do think that if we are documenting that terminate_after
is going to be for each slice instead of each shard in the concurrent case, then it's more logically consistent for the first scenario you described to not return as early termination.
In my opinion if we say that terminate_after
is at a slice level, then the expectation is set that terminate_after
doesn't work the same as in the non-concurrent case and it's actually more confusing for the user, or at least a new user (as opposed to a user who is replacing their non-concurrent search workflow with concurrent search), that the first scenario you described returns as terminated early.
On the other hand, just looking at the code alone the collectors aren't terminating early in that case, so it's also confusing from the developer perspective to add a check and change the flag to terminatedEarly is true in the reduce phase.
In short I think we would be adding some unintuitive and confusing behaviors here in order to make it seem like the same query is terminating early in the concurrent vs non-conurrent search cases and in that case I would prefer to go with solution 4 and return some sort of validation error if terminate_after
is used with concurrent segment search enabled.
I do think that if we are documenting that terminate_after is going to be for each slice instead of each shard in the concurrent case, then it's more logically consistent for the first scenario you described to not return as early termination.
I realized the same thing just after commenting
Taking a step back, I think the terminate_after
is an intent from user to return result fast in case of big indices where there may be lot of matches and it takes time to find all because of sequential nature of the search. With concurrent search, given the work is happening in parallel treating it per slice and terminating the individual slices when the limit is hit is still going to preserve that user intent. If all the slices completes executing before hitting the limit then presumably the search is completed faster even though total documents collected across slices (not returned in the response) are still more than the limit. Whereas if index is big such that each slice has to do enough work with potential to go beyond the limit then putting this limit at slice level will help to return the results faster.
One can view the current non-concurrent model as a special case of slice = 1 and treating this parameter at slice level will have same behavior in concurrent vs current non-concurrent model for that case. I don't think we will need to preserve this exact behavior for slice > 1 case as well (as that is a new behavior). So I am leaning back to Option 2 here with change in documentation for parameter being at slice level and having soft
limit for termination.
Thanks @sohami and @jed326 , I agree with the conclusion. Also, we probably should keep in mind that the concurrent search would never produce the same end result sets (in general, but could look consistent sometimes) even for the same query when terminate_after
is set: the concurrent part is obviously the one reason, but the shared searcher pool is another one - each query competes for execution (even sequential search have some flaws [1]).
Taking a step back, I think the terminate_after is an intent from user to return result fast in case of big indices where there may be lot of matches and it takes time to find all because of sequential nature of the search. With concurrent search, given the work is happening in parallel treating it per slice and terminating the individual slices when the limit is hit is still going to preserve that user intent.
Exactly, this is why I see no risks here (if implementation changes in future) - we will preserve the intent no matter what.
@reta @sohami @andrross thanks for the discussion on this! It looks like our consensus has landed on solution 2, which won't require any changes right now. To recap solution 2 -- in each leaf slice after maxCountHits
documents have been collected we will begin to short circuit collecting the documents in said leaf slice and throw CollectionTerminatedException
.
This exception is swallowed in ContextIndexSearcher
causing us to continue to the next leaf.
I will take a note on opensearch-project/documentation-webstie#2662 for the behavior update to treating terminate_after
as a per leaf slice count. As for this issue I think we can leave it open as a backlog issue to be picked up at a later date.
Exactly, this is why I see no risks here (if implementation changes in future) - we will preserve the intent no matter what.
@reta Thanks, I missed the nuance that the results will be more or less the same, just that the system may do more work than is optimal. I agree that this can change in the future to be more efficient and is a backward compatible change as long as we preserve the intent. I'm on board with solution 2 as @jed326 has described.
Wanted to follow-up on this issue since something that might have been overlooked last time we had this discussion is that while the returned hits
would still be controlled by the size
parameter, the hits.total.value
count could actually be different in the soft termination workflow that exists today.
Here's a toy example with 1 shard, 2 segments, 1 doc per segment:
curl -X PUT "localhost:9200/_cluster/settings?pretty" -H 'Content-Type: application/json' -d'{"transient" :{"search.concurrent_segment_search.enabled":"false"}}'
{
"acknowledged" : true,
"persistent" : { },
"transient" : {
"search" : {
"concurrent_segment_search" : {
"enabled" : "false"
}
}
}
}
curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1"
{
"took" : 28,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
}
]
}
}
curl -X PUT "localhost:9200/_cluster/settings?pretty" -H 'Content-Type: application/json' -d'{"transient" :{"search.concurrent_segment_search.enabled":"true"}}'
{
"acknowledged" : true,
"persistent" : { },
"transient" : {
"search" : {
"concurrent_segment_search" : {
"enabled" : "true"
}
}
}
}
curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1&size=1"
{
"took" : 2,
"timed_out" : false,
"terminated_early" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
}
]
}
}
We can see that while the size
parameter does correctly limit the hits
count, the hits.total.value
count is summed up across the segment slices in the concurrent search case because in soft termination the terminate_after
parameter is enforced at the slice level. Moreover, if the size
parameter is larger than terminate_after
the hits
array could have more than terminate_after
results as well:
curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1"
{
"took" : 11,
"timed_out" : false,
"terminated_early" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
},
{
"_index" : "my-index-000001",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "dengjay"
}
}
}
]
}
}
In the multi-shard case, the hits.total.value
is summed across shards:
curl "localhost:9200/*/_search?track_total_hits=true&terminate_after=1&pretty"
{
"took" : 11,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "P_Zwi4oB72JIsoqDMhYV",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
},
{
"_index" : "my-index-000002",
"_id" : "QPZwi4oB72JIsoqDWRa9",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
}
]
}
}
curl "localhost:9200/*/_search?track_total_hits=true&terminate_after=1&pretty&size=1"
{
"took" : 3,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "P_Zwi4oB72JIsoqDMhYV",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
}
]
}
}
For next steps I want to do some benchmarking for the following cases:
I suspect that there is not that large of a concurrency bottleneck when using a synchronized count across threads because there is only contention when matching documents are found at the same time and it's not the entire collect
call that is being synchronized, only the part where we check the doc count. Gathering the data on this will help us decide if it's worth synchronizing the doc count across threads or if it's better to just disable concurrent search entirely when the terminate_after
parameter is used.
@reta @sohami @andrross wanted to check in and see if any of you had any additional thoughts on a few things here:
terminate_after
is to return a subset of results quickly and if the precise hits count can still be limited by the size
parameter?@jed326 Earlier my thought was size
parameter will control the number of docs returned, but you are right in cases when size is more than terminate_after
it will end up returning more hits in the response. Also the other case which we didn't considered is request with aggregations. I think with terminate_after
using soft cancellation if there is aggregation in the request then it will also collect more documents compared to force termination in non concurrent search. This can also be confusing to the users as results for agg will probably differ too (can you verify this ?). Given this I think we have below options and the benchmark data with force termination will be useful here.
terminate_after
is present in query irrespective of value.
2.ii Based on benchmark data decide on concurrent vs non-concurrent path based on some threshold of terminate_after
. (this can be a follow-up for perf improvement).thanks @jed326
Do you think the different hits count meaningfully changes the user experience here if the user intent of terminate_after is to return a subset of results quickly and if the precise hits count can still be limited by the size parameter?
I think we could document that as a limitation that when using concurrent search, the over-fetch could happen and the total may not be accurate.
Any additional ideas thoughts on the benchmarking or opinions on synchronizing the doc count across threads vs soft termination vs disabling concurrent search or any ideas that I haven't mentioned at all?
I think we may have no choice but give up some accuracy vs adding more synchronization (and contention) points. This could be documented. I think it would become less of a problem once we introduce per search request mechanism to control that (fe concurrent: true | false
) so users could make the choices in context.
Do you think the different hits count meaningfully changes the user experience here if the user intent of terminate_after is to return a subset of results quickly and if the precise hits count can still be limited by the size parameter?
I think we could document that as a limitation that when using concurrent search, the over-fetch could happen and the total may not be accurate.
@reta it is not only about over fetching and doing more work in the backend but also returning more hits to the users. In cases like Aggs the collected count will be different. Given the results will vary with concurrent and non-concurrent path and terminate_after
is hinting to not include docs after a particular hit count I think it will be confusing to user. So by default, if we keep the user experience same by enforcing force termination and explicitly let user choose the soft termination behavior (as needed and with documenting the behavior change), that will be a much better experience and create less confusion.
Definitely we will also get the results from the benchmark which @jed326 is planning to run to see how much is the overhead. If the performance is too bad and shows poor performance as compared to non-concurrent flow then we can decide to disable concurrent flow for terminate_after
cases.
@reta @sohami thanks for the discussion! Going to delay the benchmarking a little bit while I look into the issues described in https://github.com/opensearch-project/OpenSearch/issues/10054 to make sure the benchmarking results I come back with are accurate.
Hey @reta @sohami wanted to give an update on my investigation into terminate_after
behavior. Basically I found 4 problems that need to be addressed:
size=0
, track_total_hits=true
, and terminate_after
 are used together. Detailstrack_total_hits=true
 does not correctly track the total hits if terminate_after
 is used. Detailscurl -X GET "---/_search?size=0&track_total_hits=true&pretty"
{
"took" : 1,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 165346692,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
}
}
curl -X GET "---/_search?size=1&track_total_hits=true&terminate_after=1&pretty"
{
"took" : 1,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "gte"
},
"max_score" : 1.0,
"hits" : [
{
...
}
]
}
}
This is because of the change here to simulate soft termination in the results:Â https://github.com/opensearch-project/OpenSearch/blob/1118dcf3727b6a2dee598159d65560adfc4d0dec/server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java#L578-L589
There is an edge case this doesn’t cover though — if each slice has < terminate_after
 documents then we won’t go into this code block during reduce
. See example below:
curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1"
{
"took" : 3,
"timed_out" : false,
"terminated_early" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "4yhLr4oBXBfDW6cmIi_l",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
},
{
"_index" : "my-index-000001",
"_id" : "5ChLr4oBXBfDW6cmSy8F",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "dengjay"
}
}
}
]
}
}
There is actually an assertion in SearchPhaseController::getTotalHits
that is supposed to cover this scenario but given that we haven't seen this pop up with concurrent search test parameterization it means we're most likely missing coverage here.
https://github.com/opensearch-project/OpenSearch/blob/aca2e9de7daaadee77c33e21ff5215c3f1e8600f/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java#L785-L788
illegal_argument_exception
 when size=0
 and track_total_hits=true
 are used for concurrent aggs. Detailscurl -X GET "opens-clust-1gj8zaf4fng1b-d6fa7bd00441ed0d.elb.us-east-1.amazonaws.com/_search?track_total_hits=true&pretty" -H 'Content-Type: application/json' -d'
{
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
],
"type" : "search_phase_execution_exception",
"reason" : "all shards failed",
"phase" : "query",
"grouped" : true,
"failed_shards" : [
{
"shard" : 0,
"index" : "nyc_taxis",
"node" : "K7DzKxU4Tyin-01SQBj-7A",
"reason" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
}
],
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
}
},
"status" : 400
}
This is the same issue as [1] above, the exception is being handled by MultiCollectorManager
 in Lucene here instead of failing as an NPE.
terminate_after
 has no effect for concurrent aggsConcurrent Search Enabled:
curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'
{
"size": 1,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"took" : 7345,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "gte"
},
"max_score" : 0.0,
"hits" : [
{
"_index" : "nyc_taxis",
"_id" : "47sxkYoBYW-wfCAOkuIQ",
"_score" : 0.0,
"_source" : {
"payment_type" : "1",
"rate_code_id" : "1",
"tip_amount" : 1.5,
"tolls_amount" : 0.0,
"extra" : 0.5,
"passenger_count" : 1,
"pickup_location" : [
-74.00631713867188,
40.733638763427734
],
"dropoff_datetime" : "2015-01-07 21:15:13",
"trip_distance" : 0.43,
"store_and_fwd_flag" : "N",
"total_amount" : 8.3,
"fare_amount" : 5.5,
"pickup_datetime" : "2015-01-07 21:08:53",
"dropoff_location" : [
-74.00151062011719,
40.73076248168945
],
"mta_tax" : 0.5,
"vendor_id" : "2",
"improvement_surcharge" : 0.3
}
}
]
},
"aggregations" : {
"distance_histo" : {
"buckets" : [
{
"key" : 0.0,
"doc_count" : 37826898,
"total_amount_stats" : {
"count" : 37826898,
"min" : -499.0,
"max" : 989970.39,
"avg" : 7.954326743102223,
"sum" : 3.0088750637E8
}
},
{
"key" : 1.0,
"doc_count" : 54261042,
"total_amount_stats" : {
"count" : 54261042,
"min" : -69.7,
"max" : 650262.85,
"avg" : 10.610401890365468,
"sum" : 5.7573146261E8
}
},
Concurrent Search Disabled
curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'
{
"size": 1,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"took" : 2,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 0.0,
"hits" : [
{
"_index" : "nyc_taxis",
"_id" : "47sxkYoBYW-wfCAOkuIQ",
"_score" : 0.0,
"_source" : {
"payment_type" : "1",
"rate_code_id" : "1",
"tip_amount" : 1.5,
"tolls_amount" : 0.0,
"extra" : 0.5,
"passenger_count" : 1,
"pickup_location" : [
-74.00631713867188,
40.733638763427734
],
"dropoff_datetime" : "2015-01-07 21:15:13",
"trip_distance" : 0.43,
"store_and_fwd_flag" : "N",
"total_amount" : 8.3,
"fare_amount" : 5.5,
"pickup_datetime" : "2015-01-07 21:08:53",
"dropoff_location" : [
-74.00151062011719,
40.73076248168945
],
"mta_tax" : 0.5,
"vendor_id" : "2",
"improvement_surcharge" : 0.3
}
}
]
},
"aggregations" : {
"distance_histo" : {
"buckets" : [
{
"key" : 0.0,
"doc_count" : 1,
"total_amount_stats" : {
"count" : 1,
"min" : 8.3,
"max" : 8.3,
"avg" : 8.3,
"sum" : 8.3
}
}
]
}
}
}
terminate_after
depends on forceTermination being true to terminate Aggregators. This is because the MultiCollector
swallows the soft termination CollectionTerminatedException
and only rethrows it if all collectors have terminated, so during soft termination once the EarlyTerminatingCollector
terminates the Aggregator
will keep on collecting. See https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java#L214-L233.
[1],[3] are tracked in https://github.com/opensearch-project/OpenSearch/issues/10054
[2],[4] are both consequences of how terminate_after
's soft termination is implemented today and would be fixed by switching over to force termination.
So for the following reasons I think the best solution is force terminate concurrent search by default, if necessary provide an option for the user to soft terminate:
track_total_hits=true
not always give an eq
relation which is a behavior change. We certainly could make the total hits still accurate in the soft termination case but in general I don't like the idea of fudging the hit count for it to be consistent. Moreover, this fix only applies to the TopDocs collector and all the doc counts for other collectors (ie Aggregators) would still be wrong.GlobalHitsThresholdChecker
used by TopScoreDocCollector
is already synchronizing the doc count across threads. Moreover, any contention will only happen when matching docs are found at the same time and even then the only part that is synchronized is the doc count check, the rest of the doc collection process can still be parallelized.That being said, I will still follow up with the benchmarking results since I don't think any of the issues identified here will have an impact on result accuracy. From those results we can determine if we should disable concurrent search for terminate_after
workflow and/or if a soft termination option is necessary.
I have some benchmark numbers to discuss now, but before jumping into that I want to clarify some terminology we're using so that we're all on the same page. There are 2 things we need to look at here, which I will call force termination and soft/hard termination.
EarlyTerminatingCollector
and how that exception is subsequently handled: https://github.com/opensearch-project/OpenSearch/blob/ef167c02b587b7e32b10f6e63a54f1e7a249257e/server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollector.java#L94-L98terminate_after
collecting the exact count of documents while soft termination refers to some overfetching that can occur when we do not synchronize the doc count today.That being said, we can call the current behavior of terminate_after
that exists on main
today "No Force Soft Termination". As explained in problem 4 in https://github.com/opensearch-project/OpenSearch/issues/8371#issuecomment-1726761922 above, the soft termination part of this behavior is incorrect and will not early terminate any of the other operators in the collector tree, including (but not limited to) any aggregators. Therefore I want to drive this discussion towards whether we should adopt Force Hard Termination or Force Soft Termination.
All benchmarking was performed on the nyc_taxis
data set using OSB:
health status index uuid pri rep docs.count
green open nyc_taxis Q93xA6nqSXCBcQmRtA5biA 1 0 165346692
Force Hard Termination changes can be found here: https://github.com/jed326/OpenSearch/tree/force-terminate_after Force Soft Termination changes can be found here: https://github.com/jed326/OpenSearch/tree/force-terminate-unsync
First, let's look at a common use case where there is filtering in our query and we use terminate_after
with an agg:
With size=1
date_histogram_agg-10000 | date_histogram_agg-100000 | date_histogram_agg-1000000 | |
---|---|---|---|
Concurrent Search Disabled | 16.3908 | 24.3959 | 106.87 |
Force Hard Termination / 0 slice | 15.1163 | 17.1754 | 59.0757 |
Force Hard Termination / 4 slice | 18.3088 | 20.3513 | 57.8736 |
Force Soft Termination / 0 slice | 19.5888 | 32.0781 | 127.135 |
Force Soft Termination / 4 slice | 20.5357 | 20.5357 | 112.918 |
In this case our index has around 160m docs and the range query filters it down to around 10m, which we then perform the aggregation on.
With size=0
date_histogram_agg-10000 | date_histogram_agg-100000 | date_histogram_agg-1000000 | |
---|---|---|---|
Concurrent Search Disabled | 4.69771 | 12.6467 | 95.0182 |
Force Hard Termination / 0 slice | 13.1265 | 12.5323 | 45.153 |
Force Hard Termination / 4 slice | 15.86 | 17.2851 | 47.4627 |
Force Soft Termination / 0 slice | 20.7045 | 30.8375 | 152.661 |
Force Soft Termination / 4 slice | 21.359 | 27.841 | 143.685 |
On the other hand, we can also look at the worst case scenario - a query / agg that matches all the docs in the index
default-terminate-after-10000000 | default-terminate-after-100000000 | default-terminate-after-1000000000 | distance_amount_agg-terminate-after-100000000 | distance_amount_agg-terminate-after-1000000000 | |
---|---|---|---|---|---|
Concurrent Search Disabled | 302.061 | 2931.64 | 4832.69 | 14085.7 | 23283.7 |
No Force Soft Termination / 0 slice | 351.496 | 966.018 | 971.802 | 9043.77 | 9049.07 |
Force Hard Termination / 0 slice | 201.223 | 2077.96 | 3255.5 | 5248.2 | 8634.14 |
Force Hard Termination / 4 slice | 204.913 | 2108.71 | 3192.53 | 5284.95 | 8985.04 |
Force Soft Termination / 0 slice | 99.7383 | 662.852 | 596.704 | 6578.87 | 6580.42 |
From this we can see that as terminate_after
approaches larger numbers we begin to see a bigger performance hit. Since the index has ~160m docs, the 1B terminate_after case is the absolute worst case scenario where we have a synchronized doc count up for the entire index.
Full results: workloads.txt terminate_after_benchmark.txt
Main takeaways:
terminate_after
values non-concurrent search may outperform concurrent search, however this is already known for small data sets due to the added overhead for concurrent search.terminate_after
.terminate_after
gets even larger, eventually Force Soft Termination becomes more performant due to synchronization costs.With the benchmarking data in mind, I see a few possible solutions:
terminate_after
is used. I think this is a reasonable solution by itself since we typically expect smaller numbers to be used in terminate_after
cases.terminate_after
is used and provide a configurable threshold above which Force Hard Termination behavior is used.terminate_after
threshold.I do not think we should default to Force Soft Termination because the behavior could be dramatically different between the concurernt and non-concurrent cases then. For example, we would collect up to slice_count * terminate_after
docs and many of the cases that are terminated_early: true
would be false
when moving from non-concurrent search to concurrent search. Moreover there's also problem 2 in the previous comment about how track_total_hits=true
expects an eq
relation every time.
My preference is solution 5 -- most immediately I think we can support Force Hard Termination as the default case and as follow-ups we can do the optimization to introduce a terminate_after
threshold below which we revert to non-concurrent search and also provide a Force Soft Termination mechanism for users who are looking to further speed up large terminate_after
queries.
Please let me know what you think @sohami @reta!
@jed326 Thanks for the detailed writeup and sharing the perf run numbers.
For very small terminate_after values non-concurrent search may outperform concurrent search, however this is already known for small data sets due to the added overhead for concurrent search.
terminate_after
is essentially limiting the matching doc which is processed as part of the request. I would expect in most of the cases, the value of this parameter to be small (probably < 100k, this is based on my intuition) since it may be used to sample the document corpus. Also size=0
use case is common for aggs. so considering that I think in most common cases we will see concurrent search with force termination to perform relatively worse than non concurrent search. For use cases where terminate_after
value is very large, then I will assume those to be as good as running the query without terminate_after
clause and concurrent search can be used in those cases to get the latency benefit.
I do not think we should default to Force Soft Termination because the behavior could be dramatically different between the concurernt and non-concurrent cases then. For example, we would collect up to slice_count * terminate_after docs and many of the cases that are terminated_early: true would be false when moving from non-concurrent search to concurrent search. Moreover there's also problem 2 in the previous comment about how track_total_hits=true expects an eq relation every time.
I agree Force Soft Termination seems to cause lot more issues as the behavior with track_total_hits=true
and results for other operations in collector tree (like aggs) or even hits count varies w.r.t non-concurrent search. The complexity to understand these change may not be worth the latency improvement here and can be taken as a follow-up based on feedback on usage pattern from different users. Also given in some cases, force hard termination is better than non-concurrent case we may not need this improvement.
My preference is solution 5 -- most immediately I think we can support Force Hard Termination as the default case and as follow-ups we can do the optimization to introduce a terminate_after threshold below which we revert to non-concurrent search and also provide a Force Soft Termination mechanism for users who are looking to further speed up large terminate_after queries.
I agree with Option 5 with the exception of default being using non-concurrent
search. Since we don't have any clear winner for terminate_after
, keeping it default to non-concurrent path atleast doesn't introduce any regression. I was thinking we can probably provide a cluster setting based threshold mechanism (with default value being very high to force non-concurrent path) and if set to a value such that it is < terminate_after
value in search request, it can follow the concurrent path. Ideally, the best would be to have a smartness in server side to decide when to take concurrent vs non-concurrent path based on some heuristics like terminate_after
value vs total doc count. But that will require us to look into different types to workloads to come up with a good default. Setting mechanism provide flexibility in absence of that default which can vary for different workloads. I am fine to have this as follow-up again as it is perf improvement to the current behavior and not creating any regression for now. But given you already have an implementation its about introducing a new setting we can as well provide this option now. Would like to hear thoughts from @reta as well on this.
Thanks @jed326 and @sohami , it is hard to disagree with the conclusions (based on the results). +1 to @sohami that the default path should be non-concurrent search (for terminate_after
use case), that would minimize the risk of possible regressions (we have done pretty limited set of benchmarks).
But given you already have an implementation its about introducing a new setting we can as well provide this option now.
I am referring to this a few times probably, but I believe we could make the choice driven by search requests, for example something like that:
{
"terminate_after": ...,
"terminate_after_threshold": "exact | hard | soft"
}
Using settings for thresholds (in my opinion) are very difficult to set right, even on per index basis: it either applies to all queries or none of them, basically we ask user to either make a guess or to profile her queries extensively. With per search request tuning the user could implement heuristic herself by running the same query in concurrent / non-concurrent fashion and returning the first results (ideally, that what we possibly could do but doing more work in background could impact cluster operations).
@reta @sohami thanks! Defaulting to non-concurrent search case sounds good to me.
For the settings part today we have cluster settings, index settings, and request parameters, each one taking higher priority and overriding the previous. Just like how for concurrent search enable/disable we have provided both cluster and index settings, I think it would make sense to introduce a cluster or index level setting like @sohami is suggesting and as a follow-up provide the request level options to override it. This would be similar to how the request cache index setting is overridden by the request cache request parameter. How does this sound to you @reta?
Using settings for thresholds (in my opinion) are very difficult to set right, even on per index basis: it either applies to all queries or none of them, basically we ask user to either make a guess or to profile her queries extensively.
@reta Picking right value for thresholds will definitely require some profiling of the workload to come up with right defaults. But in homogeneous workload case that should be doable. The intention of cluster setting (I don't think idx setting will make sense here) was to cover such homogenous cases where concurrent search could be used even with terminate_after
parameter. This will atleast help to learn from user workloads where it is enabled and works well with concurrent search. Using this data probably a better mechanism in the backend can be added to dynamically choose concurrent vs non-concurrent path or even come up with defaults for this setting which works best for most of the cases.
With per search request tuning the user could implement heuristic herself by running the same query in concurrent / non-concurrent fashion and returning the first results (ideally, that what we possibly could do but doing more work in background could impact cluster operations).
Request level parameter will help in mix workloads where coming up with a default is not easy. But that will be the case for all the search request types (not only limited to terminate_after
). In such cases, in absence of request level option, user can choose to either enable/disable the concurrent search at cluster level. I am treating this as more of an expert level of control. The more we learn about such use cases, we can follow-up with adding this support and either clients/server can implement request hedging mechanism (as you are suggesting) at the expense of increased work. This will require cancellation mechanism to be improved to ensure extra work can be cancelled for sure as required.
Thanks @jed326
This would be similar to how the request cache index setting is overridden by the request cache request parameter. How does this sound to you @reta?
I am not against settings, but I would like to understand what guidance should be provided to the users for configuring them. Do you have a clear set of steps or procedure (we will have to provide the documentation anyway) on how non-expert users should pick the value for each of them? And since we are dealing with changing indices, how users could be sure those settings that they picked yesterday are still relevant today? (figuratively speaking)
Do you have a clear set of steps or procedure (we will have to provide the documentation anyway) on how non-expert users should pick the value for each of them?
We will definitely provide documentation, but as @sohami mentioned above the main intention of a setting is to cover some broad spectrum of cases and give the user an additional control to revert back to non-concurrent search case if they see performance regressions in their workload.
Broadly speaking, if the user is using terminate_after
> the threshold and is seeing performance regressions they can increase the threshold, while if the user is using terminate_after
< the threshold and would like to see if they can improve their search request time they can try increasing the threshold.
@sohami @jed326 sure, I as mentioned - not against that if we are able guide the user to configure those, request level setting significantly reduces the usage gap and onbroading process for the user.
Keeping this issue open for now since we still want to follow up to support concurrent search for terminate_after
workflow since as a part of https://github.com/opensearch-project/OpenSearch/pull/10200 we disabled concurrent segment search for the terminate_after
path.
Summarizing the remaining issues below:
Based on the discussion above in https://github.com/opensearch-project/OpenSearch/issues/8371#issuecomment-1734733950, we want to go forward with implementing a threshold setting above which we can use concurrent segment search for the terminate_after
workflow. A pre-requisite for this is to support forceTermination
so that the EarlyTerminatingCollector
is ablow to properly terminate the collector tree.
Summary of the issues:
TotalHitCountCollectorManager
used when size=0
with terminate_after
doesn't always compute the correct TotalHits.relation
in the reduce phase. This requires a deep dive to figure out what is needed to support this edge case.TopDocsCollectorContext::newTopDocs
also needs to compute the correct relation. Whenever track_total_hits=true
the relation needs to always be eq
.CollectionTerminatedException
thrown in the soft termination path is swallowed in ContexIndexSearcher
and will cause other operators in the collector tree (ex. Aggregators) to not terminate collection. This is why forceTermination is needed.A POC that addresses many of these can be found here: https://github.com/opensearch-project/OpenSearch/commit/3bd8fe152d04c9bb2846dd0f2b37c2dcd0c71b47
The TotalHitCountCollectorManager used when size=0 with terminate_after doesn't always compute the correct TotalHits.relation in the reduce phase. This requires a deep dive to figure out what is needed to support this edge case.
With the use of Weight#count
in Collector
s, you can no longer guarantee EQ relation when terminate_after
is set. I think we should just document this behavior. Or make the use of Weight#count
optional (which might be a breaking change). This is orthogonal to concurrent search.
Are 2 & 3 caused by concurrent search? I would focus on behavior changes caused by concurrent search.
Late to the party, but just wanted to quickly chime in.
Based on the discussion above in https://github.com/opensearch-project/OpenSearch/issues/8371#issuecomment-1734733950, we want to go forward with implementing a threshold setting above which we can use concurrent segment search for the terminate_after workflow.
So, this threshold is actually some kind of ratio, right? As the search space (the size of indexed data) grows, this threshold can grow, too? In general, if there are at least three segments to process, I think concurrent search will do better than non concurrent. Maybe even two. If terminate_after
> numDocs of the first segment, it may be safe to go with concurrent. Do you sort the segments in any way before starting searchLeaf
? If you sort from smallest to largest, you might increase the benefits of concurrent search.
So, this threshold is actually some kind of ratio, right? As the search space (the size of indexed data) grows, this threshold can grow, too?
I think we don't know yet for sure: there are many variables (number of segments, slices, , number documents, thread pool saturation etc), we probably need many more experiments to get insights here.
Maybe even two. If terminate_after > numDocs of the first segment, it may be safe to go with concurrent.
I think the decision which path to take is made way before and it also depends on sorting and index type (data streams), but again, more experiments could help.
Do you sort the segments in any way before starting searchLeaf? If you sort from smallest to largest, you might increase the benefits of concurrent search.
We do I believe but not by size, specifically for data streams, depending on sort criteria, we could reverse the traversal.
As a follow-up to #8306, we need to add support for forceTermination in the concurrent search case. Creating a new issue to track this as we need to track the number of docs across the threads and the implementation will require some discussion.
Describe the solution you'd like forceTermination should be properly supported in concurrent segment search.