opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.78k stars 1.82k forks source link

Shard Recovery Improvements #9996

Open Bukhtawar opened 1 year ago

Bukhtawar commented 1 year ago

Listing down the improvements

  1. During shard relocations before a shard can be marked active, the system makes a call unlink call which is too slow as demonstrated below which is pretty common for a large files i.e the kernel doesn't unlock the inode until all the blocks pointers are returned to the free block list. Now the unlink call is executed on the cluster applier thread. So if either the file is large or disk slow, this can result in node drops as seen in
    [2023-07-26T01:56:49,671][WARN ][o.e.c.s.ClusterApplierService] [f7a5b737d4c4ff5230a96eac2e48e6ba] cluster state applier task [indices_store ([[test-idx][12]] active fully on other nodes)] took [3.4m] which is above the warn threshold of [30s]: [running task [indices_store ([[test-idx][12]] active fully on other nodes)]] took [208591ms]
100.3% (501.5ms out of 500ms) cpu usage by thread 'elasticsearch[01bb1f86d2d4791f21a37ab964aa0fdf][clusterApplierService#updateTask][T#1]'
     10/10 snapshots sharing following 28 elements
       java.base@11.0.19/sun.nio.fs.UnixNativeDispatcher.unlink0(Native Method)
       java.base@11.0.19/sun.nio.fs.UnixNativeDispatcher.unlink(UnixNativeDispatcher.java:156)
       java.base@11.0.19/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:236)
       java.base@11.0.19/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
       java.base@11.0.19/java.nio.file.Files.delete(Files.java:1142)
       app//org.elasticsearch.core.internal.io.IOUtils$1.visitFile(IOUtils.java:243)
       app//org.elasticsearch.core.internal.io.IOUtils$1.visitFile(IOUtils.java:222)
       java.base@11.0.19/java.nio.file.Files.walkFileTree(Files.java:2725)
       java.base@11.0.19/java.nio.file.Files.walkFileTree(Files.java:2797)
       app//org.elasticsearch.core.internal.io.IOUtils.rm(IOUtils.java:222)
       app//org.elasticsearch.core.internal.io.IOUtils.rm(IOUtils.java:202)
       app//org.elasticsearch.env.NodeEnvironment.deleteShardDirectoryUnderLock(NodeEnvironment.java:523)
       app//org.elasticsearch.env.NodeEnvironment.deleteShardDirectorySafe(NodeEnvironment.java:472)
       app//org.elasticsearch.indices.IndicesService.deleteShardStore(IndicesService.java:963)
       app//org.elasticsearch.indices.store.IndicesStore$ShardActiveResponseHandler.lambda$allNodesResponded$2(IndicesStore.java:294)
       app//org.elasticsearch.indices.store.IndicesStore$ShardActiveResponseHandler$$Lambda$5075/0x00007eff660344b0.accept(Unknown Source)
       app//org.elasticsearch.cluster.service.ClusterApplierService.lambda$runOnApplierThread$0(ClusterApplierService.java:297)
       app//org.elasticsearch.cluster.service.ClusterApplierService$$Lambda$5077/0x00007eff66035cb0.apply(Unknown Source)
       app//org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.apply(ClusterApplierService.java:157)
       app//org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:393)
       app//org.elasticsearch.cluster.service.ClusterApplierService.access$000(ClusterApplierService.java:68)
       app//org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:162)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:693)
       app//org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
       app//org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
       java.base@11.0.19/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
       java.base@11.0.19/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       java.base@11.0.19/java.lang.Thread.run(Thread.java:829)
  1. Rate Limiter configurations aren't absolute, as in they only dictate peer recovery rates but doesn't accomodate for IOPS consumed by concurrent operations like

a) Merge

100.2% (500.8ms out of 500ms) cpu usage by thread 'elasticsearch[f0a870a4eef9cf89a409681f565d3546][[test-idx-1][28]: Lucene Merge Thread #107]'
     2/10 snapshots sharing following 13 elements
       app//org.apache.lucene.codecs.lucene80.Lucene80DocValuesConsumer.doAddSortedField(Lucene80DocValuesConsumer.java:596)
       app//org.apache.lucene.codecs.lucene80.Lucene80DocValuesConsumer.addSortedSetField(Lucene80DocValuesConsumer.java:746)
       app//org.apache.lucene.codecs.DocValuesConsumer.mergeSortedSetField(DocValuesConsumer.java:804)
       app//org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:145)
       app//org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:155)
       app//org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:195)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:150)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4760)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4364)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5923)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:624)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:100)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:682)
     2/10 snapshots sharing following 13 elements
       app//org.apache.lucene.codecs.DocValuesConsumer$5$1.nextDoc(DocValuesConsumer.java:848)
       app//org.apache.lucene.codecs.lucene80.Lucene80DocValuesConsumer.addSortedSetField(Lucene80DocValuesConsumer.java:737)
       app//org.apache.lucene.codecs.DocValuesConsumer.mergeSortedSetField(DocValuesConsumer.java:804)
       app//org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:145)
       app//org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:155)
       app//org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:195)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:150)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4760)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4364)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5923)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:624)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:100)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:682)
     6/10 snapshots sharing following 9 elements
       app//org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:155)
       app//org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:195)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:150)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4760)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4364)

b) Refresh/Flush

100.2% (500.9ms out of 500ms) cpu usage by thread 'elasticsearch[c7326cccfef5c68da41608fefbb15e2a][generic][T#116]'
     3/10 snapshots sharing following 37 elements
       app//org.apache.lucene.index.DefaultIndexingChain.flush(DefaultIndexingChain.java:239)
       app//org.apache.lucene.index.DocumentsWriterPerThread.flush(DocumentsWriterPerThread.java:350)
       app//org.apache.lucene.index.DocumentsWriter.doFlush(DocumentsWriter.java:480)
       app//org.apache.lucene.index.DocumentsWriter.flushAllThreads(DocumentsWriter.java:660)
       app//org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:605)
       app//org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:290)
       app//org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:265)
       app//org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:255)
       app//org.apache.lucene.index.FilterDirectoryReader.doOpenIfChanged(FilterDirectoryReader.java:112)
       app//org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:140)
       app//org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:66)
       app//org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:40)
       app//org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
       app//org.apache.lucene.search.ReferenceManager.maybeRefreshBlocking(ReferenceManager.java:253)
       app//org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:1681)
       app//org.elasticsearch.index.engine.InternalEngine.refreshIfNeeded(InternalEngine.java:2814)
       app//org.elasticsearch.index.engine.InternalEngine.newChangesSnapshot(InternalEngine.java:2707)
       app//org.elasticsearch.index.engine.InternalEngine.estimateNumberOfHistoryOperations(InternalEngine.java:583)
       app//org.elasticsearch.index.shard.IndexShard.estimateNumberOfHistoryOperations(IndexShard.java:2041)
       app//org.elasticsearch.indices.recovery.RecoveryTarget.hasUncommittedOperations(RecoveryTarget.java:326)
       app//org.elasticsearch.indices.recovery.RecoveryTarget.lambda$finalizeRecovery$1(RecoveryTarget.java:316)
       app//org.elasticsearch.indices.recovery.RecoveryTarget$$Lambda$4519/0x00007f495e70d900.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:325)
       app//org.elasticsearch.indices.recovery.RecoveryTarget.finalizeRecovery(RecoveryTarget.java:300)
       app//org.elasticsearch.indices.recovery.PeerRecoveryTargetService$FinalizeRecoveryRequestHandler.messageReceived(PeerRecoveryTargetService.java:321)
       app//org.elasticsearch.indices.recovery.PeerRecoveryTargetService$FinalizeRecoveryRequestHandler.messageReceived(PeerRecoveryTargetService.java:311)

c) Translog Replay

23.0% (114.7ms out of 500ms) cpu usage by thread 'elasticsearch[75cdd01fb8c6865133e66f391567166f][generic][T#194]'
     2/10 snapshots sharing following 29 elements
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:419)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.softUpdateDocument(IndexWriter.java:1799)
       app//org.elasticsearch.index.engine.InternalEngine.updateDocs(InternalEngine.java:1290)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1119)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:950)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:869)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:841)
       app//org.elasticsearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:1583)
       app//org.elasticsearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:1570)
       app//org.elasticsearch.indices.recovery.RecoveryTarget.lambda$indexTranslogOperations$2(RecoveryTarget.java:370)
       app//org.elasticsearch.indices.recovery.RecoveryTarget$$Lambda$4503/0x00007f71aa2e4db8.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:325)
       app//org.elasticsearch.indices.recovery.RecoveryTarget.indexTranslogOperations(RecoveryTarget.java:345)
       app//org.elasticsearch.indices.recovery.PeerRecoveryTargetService$TranslogOperationsRequestHandler.performTranslogOps(PeerRecoveryTargetService.java:393)
       app//org.elasticsearch.indices.recovery.PeerRecoveryTargetService$TranslogOperationsRequestHandler.messageReceived(PeerRecoveryTargetService.java:352)
       app//org.elasticsearch.indices.recovery.PeerRecoveryTargetService$TranslogOperationsRequestHandler.messageReceived(PeerRecoveryTargetService.java:339)

d) Searches on a cold cache

Describe the solution you'd like A clear and concise description of what you want to happen.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

jainankitk commented 1 year ago

During shard relocations before a shard can be marked active, the system makes a call unlink call which is too slow as demonstrated below which is pretty common for a large files i.e the kernel doesn't unlock the inode until all the blocks pointers are returned to the free block list.

Wouldn't the unlink syscall invoked after the shard is marked active? After peer recovery, the shard is marked active on new node, and the old node will start deleting the files.