opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.56k stars 1.75k forks source link

[Concurrent Segment Search] Functional test coverage #9468

Closed sohami closed 7 months ago

sohami commented 1 year ago

Describe the solution you'd like This is a tracking issue to verify functionality of the concurrent segment search other than Java IT tests which is being enabled as part of separate issue https://github.com/opensearch-project/OpenSearch/issues/7440

We can use r5.xlarge instance type for below:

  1. Run all the OSB workloads with concurrent segment search enabled, slice count of 0 and multiple client say 2 or 4. With each run ensure there is no error or failures during the execution and results are correct and same as concurrent search disabled.
  2. Perform reindex operation with concurrent segment search enabled. As part of reindex first documents are queried from source index and then indexed into a target index. This will catch any issues with querying the documents using concurrent segment search.
  3. Run few OSB workloads with concurrent segment search enabled and on SegRep enabled indices and verify i) the results are same as doc rep enabled indices ii) no failures are seen during the run with seg rep indices. For this we will need multi-node setup with replicas such that search can happen over replicas too.
neetikasinghal commented 1 year ago

i am working on this

neetikasinghal commented 1 year ago

Functional Testing for Concurrent Search

Overview

As part of functional testing for concurrent search, we need to validate that the different search operations are working as 
is without any regression. In order to ensure that, the tests are run across the following workloads:

Different workloads in opensearch benchmarking (ref: link):

Cluster creation

Following three clusters are created with the help of opensearch-cluster-cdk: 
Create cluster command:
cdk deploy "*" \
--context region=$REGION \
--context securityDisabled=true \
--context minDistribution=$MIN_DISTRIBUTION \
--context distributionUrl=$DISTRIBUTION_URL \
--context cpuArch=$CPUARCH \
--context singleNodeCluster=$SINGLENODE \
--context distVersion=$DISTRO \
--context serverAccessType=$ACCESSTYPE \
--context restrictServerAccessTo=$ACCESS_PREFIX \
--context dataInstanceType=$INSTANCE_TYPE \
--context storageVolumeType=$STORAGE_VOL_TYPE \
--context dataNodeStorage=$STORAGE_SIZE \
--context use50PercentHeap=true \
--context suffix=$SUFFIX \
--context additionalConfig=$ADDITIONAL_CONFIG

Data ingestion

On every cluster the benchmarking is run for all the workloads as follows:

Cluster 1: neetiks-1-cs-enabled-slice0
opensearch-infra-stack-neetiks-1-cs-enabled-slice0.loadbalancerurl = 
opensearch-infra-stack-neetiks-1-cs-enabled-slice0.privateip =
MIN_DISTRIBUTION=true
DISTRIBUTION_URL=https://artifacts.opensearch.org/snapshots/core/opensearch/2.10.0-SNAPSHOT/opensearch-min-2.10.0-SNAPSHOT-linux-x64-latest.tar.gz
CPUARCH='x64'
SINGLENODE=true
HEAP50PCT=true
SUFFIX='neetiks-1-cs-enabled-slice0'
ACCESSTYPE='prefixList'
ACCESS_PREFIX='pl-4e2ece27'
DISTRO='2.10.0'
INSTANCE_TYPE=r5.8xlarge
ADDITIONAL_CONFIG='{"opensearch.experimental.feature.concurrent_segment_search.enabled": "true", "search.concurrent.max_slice_count": "0"}'
STORAGE_SIZE=1000
STORAGE_VOL_TYPE='gp3
#!/bin/sh
array=(
nyc_taxis
geonames
geopoint
geopointshape
geoshape
nested
noaa
percolator
pmc
http_logs
)

for index in ${!array[*]}; do
opensearch-benchmark execute-test \
--workload-params="target_throughput:none,number_of_replicas:0,number_of_shards:1,search_clients:2" \
--user-tag="testcase:functional-cs-enabled-slice-0,arch:x86,instance:r5.large,cs_enabled:true,slice_count:0,num_shards:1,shard_size:default,num_replicas:0,node_count:1,client_count:2,search_threadpool:default" \
--target-hosts '' \
--pipeline=benchmark-only \
--telemetry=node-stats \
--kill-running-processes \
--workload-path=/home/ec2-user/.benchmark/benchmarks/workloads/default/${array[$index]} \
--enable-assertions
sleep 60
done

Cluster 2: neetiks-1-cs-enabled-slice1
opensearch-infra-stack-neetiks-1-cs-enabled-slice1.loadbalancerurl = 
opensearch-infra-stack-neetiks-1-cs-enabled-slice1.privateip =
MIN_DISTRIBUTION=true
DISTRIBUTION_URL=https://artifacts.opensearch.org/snapshots/core/opensearch/2.10.0-SNAPSHOT/opensearch-min-2.10.0-SNAPSHOT-linux-x64-latest.tar.gz
CPUARCH='x64'
SINGLENODE=true
HEAP50PCT=true
SUFFIX='neetiks-1-cs-enabled-slice1'
ACCESSTYPE='prefixList'
ACCESS_PREFIX='pl-4e2ece27'
DISTRO='2.10.0'
INSTANCE_TYPE=r5.8xlarge
ADDITIONAL_CONFIG='{"opensearch.experimental.feature.concurrent_segment_search.enabled": "true", "search.concurrent.max_slice_count": "1"}'
STORAGE_SIZE=1000
STORAGE_VOL_TYPE='gp3'
#!/bin/sh
array=(
nyc_taxis
geonames
geopoint
geopointshape
geoshape
nested
noaa
percolator
pmc
http_logs
)

for index in ${!array[*]}; do
opensearch-benchmark execute-test \
--workload-params="target_throughput:none,number_of_replicas:0,number_of_shards:1,search_clients:2" \
--user-tag="testcase:functional-cs-enabled-slice-1,arch:x86,instance:r5.large,cs_enabled:true,slice_count:1,num_shards:1,shard_size:default,num_replicas:0,node_count:1,client_count:2,search_threadpool:default" \
--target-hosts '' \
--pipeline=benchmark-only \
--telemetry=node-stats \
--kill-running-processes \
--workload-path=/home/ec2-user/.benchmark/benchmarks/workloads/default/${array[$index]} \
--enable-assertions
sleep 60
done

Cluster 3: neetiks-1-cs-disabled
opensearch-infra-stack-neetiks-2-cs-disabled.loadbalancerurl = 
opensearch-infra-stack-neetiks-2-cs-disabled.privateip =
MIN_DISTRIBUTION=true
DISTRIBUTION_URL=https://artifacts.opensearch.org/snapshots/core/opensearch/2.10.0-SNAPSHOT/opensearch-min-2.10.0-SNAPSHOT-linux-x64-latest.tar.gz
CPUARCH='x64'
SINGLENODE=true
HEAP50PCT=true
SUFFIX='neetiks-2-cs-disabled'
ACCESSTYPE='prefixList'
ACCESS_PREFIX='pl-4e2ece27'
DISTRO='2.10.0'
INSTANCE_TYPE=r5.8xlarge
STORAGE_SIZE=1000
STORAGE_VOL_TYPE='gp3'
#!/bin/sh
array=(
nyc_taxis
geonames
geopoint
geopointshape
geoshape
nested
noaa
percolator
pmc
http_logs
)

for index in ${!array[*]}; do
opensearch-benchmark execute-test \
--workload-params="target_throughput:none,number_of_replicas:0,number_of_shards:1,search_clients:2" \
--user-tag="testcase:functional-cs-disabled,arch:x86,instance:r5.large,cs_enabled:false,slice_count:default,num_shards:1,shard_size:default,num_replicas:0,node_count:1,client_count:2,search_threadpool:default" \
--target-hosts '' \
--pipeline=benchmark-only \
--telemetry=node-stats \
--kill-running-processes \
--workload-path=/home/ec2-user/.benchmark/benchmarks/workloads/default/${array[$index]} \
--enable-assertions
sleep 60
done

Search validation script

import json
import requests

if __name__ == '__main__':
fp = open("/Users/neetiks/workspace/PycharmProjects/SnapshotDelete/benchmarks/nyc_taxis_default.json", "r")
index_name = "nyc_taxis"
data = json.load(fp)

slice0_url = ''
slice1_url = ''
cs_disabled_url = ''

print('dataset validation starting...')
for entry in data:
if entry["operation-type"] == "search":
if entry.get("body") is None:
continue
request_body = entry["body"]
search_request = index_name + "/_search?pretty"
print('operation name: ' + str(entry["name"]))
print('search request body : ' + str(request_body))

# baseline response
try:
baseline_response = requests.post(cs_disabled_url + search_request, json=request_body, headers={"Content-Type":"application/json"})
baseline_hits = json.loads(baseline_response.text)["hits"]["total"]["value"]
print('baseline hits = ' + str(baseline_hits))

# slice0 response
slice0_response = requests.post(cs_disabled_url + search_request, json=request_body,
headers={"Content-Type": "application/json"})
slice0_hits = json.loads(slice0_response.text)["hits"]["total"]["value"]
print('slice 0 hits = ' + str(slice0_hits))
assert baseline_hits == slice0_hits
assert json.loads(baseline_response.text)["hits"]["hits"] == json.loads(slice0_response.text)["hits"]["hits"]

# slice1 response
slice1_response = requests.post(cs_disabled_url + search_request, json=request_body,
headers={"Content-Type": "application/json"})
slice1_hits = json.loads(slice1_response.text)["hits"]["total"]["value"]
assert baseline_hits == slice1_hits
assert json.loads(baseline_response.text)["hits"]["hits"] == json.loads(slice1_response.text)["hits"]["hits"]
print('slice 1 hits = ' + str(slice1_hits))
except Exception as e:
print('error:' + str(e))
print('dataset validation complete!!')

Results

Dataset | Assert hits | index -- | -- | -- nyc_taxis | successful | nyc_taxis geonames | successful | geonames geopoint | successful | osmgeopoints geopointshape | successful | osmgeoshapes geoshape | successful | osmlinestrings, osmmultilinestrings, osmpolygons nested | successful | sonested noa | successful | weather-data-2016 percolator | successful | queries pmc | successful | pmc http_logs | successful | reindexed-logs, logs-181998, logs-191998, logs-201998, logs-211998, logs-221998, logs-231998, logs-241998

Reindexing

Reindex nyc_taxis on c1:
curl -XPOST http://$c1/_reindex?pretty -H 'Content-Type: application/json' -d '{
"source":{
"index":"nyc_taxis"
},
"dest":{
"index":"reindexed_nyc_taxis"
}
}'
curl -XPUT http://$c1/reindexed_nyc_taxis/_settings -H 'Content-Type: application/json' -d '{
"index":{
"number_of_replicas":0
}
}'

curl -XPOST http://$c1/_reindex?pretty -H 'Content-Type: application/json' -d '{
"source":{
"index":"osmpolygons"
},
"dest":{
"index":"reindexed_osmpolygons"
}
}'
curl -XPUT http://$c1/reindexed_osmpolygons/_settings -H 'Content-Type: application/json' -d '{
"index":{
"number_of_replicas":0
}
}'
When running the aggs without defining the mappings, there is an exception:
b'{\n "error" : {\n "root_cause" : [\n {\n "type" : "illegal_argument_exception",\n "reason" : "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [dropoff_datetime] in order to load field data by uninverting the inverted index. Note that this can use significant memory."\n }\n ],\n "type" : "search_phase_execution_exception",\n "reason" : "all shards failed",\n "phase" : "query",\n "grouped" : true,\n "failed_shards" : [\n {\n "shard" : 0,\n "index" : "reindexed_nyc_taxis",\n "node" : "NSGlIxLISR23LLs2U7w3bg",\n "reason" : {\n "type" : "illegal_argument_exception",\n "reason" : "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [dropoff_datetime] in order to load field data by uninverting the inverted index. Note that this can use significant memory."\n }\n }\n ],\n "caused_by" : {\n "type" : "illegal_argument_exception",\n "reason" : "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [dropoff_datetime] in order to load field data by uninverting the inverted index. Note that this can use significant memory.",\n "caused_by" : {\n "type" : "illegal_argument_exception",\n "reason" : "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [dropoff_datetime] in order to load field data by uninverting the inverted index. Note that this can use significant memory."\n }\n }\n },\n "status" : 400\n}\n'

Creating another index with mappings predefined in c1:
curl -XPUT http://$c1/reindexed_nyc_taxis2 -H 'Content-Type: application/json' -d '{
"settings":{
"index":{
"number_of_replicas":0
}
},
"mappings" : {
"dynamic" : "strict",
"properties" : {
"cab_color" : {
"type" : "keyword"
},
"dropoff_datetime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dropoff_location" : {
"type" : "geo_point"
},
"ehail_fee" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"extra" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"fare_amount" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"improvement_surcharge" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"mta_tax" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"passenger_count" : {
"type" : "integer"
},
"payment_type" : {
"type" : "keyword"
},
"pickup_datetime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"pickup_location" : {
"type" : "geo_point"
},
"rate_code_id" : {
"type" : "keyword"
},
"store_and_fwd_flag" : {
"type" : "keyword"
},
"surcharge" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"tip_amount" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"tolls_amount" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"total_amount" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"trip_distance" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"trip_type" : {
"type" : "keyword"
},
"vendor_id" : {
"type" : "keyword"
},
"vendor_name" : {
"type" : "text"
}
}
}
}'
After defining the mappings for the index reindexed_nyc_taxis2 , the validation script ran successfully.

reindexing validation starting...
operation name: default
search request body : {'query': {'match_all': {}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: range
search request body : {'query': {'range': {'total_amount': {'gte': 5, 'lt': 15}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: distance_amount_agg
search request body : {'size': 0, 'query': {'bool': {'filter': {'range': {'trip_distance': {'lt': 50, 'gte': 0}}}}}, 'aggs': {'distance_histo': {'histogram': {'field': 'trip_distance', 'interval': 1}, 'aggs': {'total_amount_stats': {'stats': {'field': 'total_amount'}}}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: autohisto_agg
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '01/01/2015', 'lte': '21/01/2015', 'format': 'dd/MM/yyyy'}}}, 'aggs': {'dropoffs_over_time': {'auto_date_histogram': {'field': 'dropoff_datetime', 'buckets': 20}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_agg
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '01/01/2015', 'lte': '21/01/2015', 'format': 'dd/MM/yyyy'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'calendar_interval': 'day'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_calendar_interval
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'calendar_interval': 'month'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_calendar_interval_with_tz
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'calendar_interval': 'month', 'time_zone': 'America/New_York'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_fixed_interval
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'fixed_interval': '60d'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_fixed_interval_with_tz
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'fixed_interval': '60d', 'time_zone': 'America/New_York'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: date_histogram_fixed_interval_with_metrics
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'date_histogram': {'field': 'dropoff_datetime', 'fixed_interval': '60d'}, 'aggs': {'total_amount': {'stats': {'field': 'total_amount'}}, 'tip_amount': {'stats': {'field': 'tip_amount'}}, 'trip_distance': {'stats': {'field': 'trip_distance'}}}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: auto_date_histogram
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'auto_date_histogram': {'field': 'dropoff_datetime', 'buckets': '12'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: auto_date_histogram_with_tz
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'auto_date_histogram': {'field': 'dropoff_datetime', 'buckets': '13', 'time_zone': 'America/New_York'}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
operation name: auto_date_histogram_with_metrics
search request body : {'size': 0, 'query': {'range': {'dropoff_datetime': {'gte': '2015-01-01 00:00:00', 'lt': '2016-01-01 00:00:00'}}}, 'aggs': {'dropoffs_over_time': {'auto_date_histogram': {'field': 'dropoff_datetime', 'buckets': '12'}, 'aggs': {'total_amount': {'stats': {'field': 'total_amount'}}, 'tip_amount': {'stats': {'field': 'tip_amount'}}, 'trip_distance': {'stats': {'field': 'trip_distance'}}}}}}
nyc_taxis hits = 10000
reindexed_nyc_taxis hits = 10000
reindexing validation complete!!

Segrep/concurrent search enabled testing

Run workload on the nyc_taxis workload with segrep enabled:

Create cluster:
export AWS_ACCOUNT=XXXXXXXX
export REGION=us-east-2
AWS_PROFILE=testAccount
MIN_DISTRIBUTION=true
DISTRIBUTION_URL=https://artifacts.opensearch.org/snapshots/core/opensearch/2.10.0-SNAPSHOT/opensearch-min-2.10.0-SNAPSHOT-linux-x64-latest.tar.gz
CPUARCH='x64'
SINGLENODE=false
HEAP50PCT=true
SUFFIX='neetiks-4-cs-segrep-enabled-slice0'
ACCESSTYPE='prefixList'
ACCESS_PREFIX='pl-e5a5408c'
DISTRO='2.10.0'
INSTANCE_TYPE=r5.8xlarge
ADDITIONAL_CONFIG='{"opensearch.experimental.feature.concurrent_segment_search.enabled": "true", "search.concurrent.max_slice_count": "0", "opensearch.experimental.feature.segment_replication_experimental.enabled" : "true"}'
STORAGE_SIZE=1000
STORAGE_VOL_TYPE='gp3'
DATANODE_COUNT=3
MASTER_NODE_COUNT=3
cdk deploy "*" \
--context region=$REGION \
--context securityDisabled=true \
--context minDistribution=$MIN_DISTRIBUTION \
--context distributionUrl=$DISTRIBUTION_URL \
--context cpuArch=$CPUARCH \
--context singleNodeCluster=$SINGLENODE \
--context distVersion=$DISTRO \
--context serverAccessType=$ACCESSTYPE \
--context restrictServerAccessTo=$ACCESS_PREFIX \
--context dataInstanceType=$INSTANCE_TYPE \
--context storageVolumeType=$STORAGE_VOL_TYPE \
--context dataNodeStorage=$STORAGE_SIZE \
--context use50PercentHeap=true \
--context suffix=$SUFFIX \
--context additionalConfig=$ADDITIONAL_CONFIG \
--context managerNodeCount=$MASTER_NODE_COUNT
Cluster endpoint: 
Run benchmark
opensearch-benchmark execute-test \
--workload-params="target_throughput:none,number_of_replicas:1,number_of_shards:2,search_clients:2" \
--user-tag="testcase:functional-cs-segrep-enabled-slice0,arch:x86,instance:r5.large,cs_enabled:false,slice_count:0,num_shards:2,shard_size:default,num_replicas:1,node_count:2,client_count:2,search_threadpool:default" \
--target-hosts '' \
--pipeline=benchmark-only \
--telemetry=node-stats \
--kill-running-processes \
--workload-path=/home/ec2-user/.benchmark/benchmarks/workloads/default/nyc_taxis \
--enable-assertions
The validation script ran successfully on the segrep & concurrent search enabled indices keeping the baseline as the normal cluster with these features disabled.