apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.45k stars 973 forks source link

Use `IndexInput#prefetch` for terms dictionary lookups. #13359

Open jpountz opened 1 month ago

jpountz commented 1 month ago

This introduces TermsEnum#prepareSeekExact, which essentially calls IndexInput#prefetch at the right offset for the given term. Then it takes advantage of the fact that BooleanQuery already calls Weight#scorerSupplier on all clauses, before later calling ScorerSupplier#get on all clauses. So TermQuery now calls TermsEnum#prepareSeekExact on Weight#scorerSupplier (if scores are not needed), which in-turn means that the I/O all terms dictionary lookups get parallelized across all term queries of a BooleanQuery on a given segment (intra-segment parallelism).

jpountz commented 1 month ago

This is a draft as I need to do more work on tests and making sure that this new method cannot corrupt the state of the SegmentTermsEnum.

But I created a benchmark that starts looking like running a Lucene query that is encouraging. It creates an index with many terms that have very short postings lists, so that running boolean queries on these terms is heavy on terms dictionary lookups rather than reading postings. Then it manually runs a disjunction over 3 terms (some of these terms may not exist in the index as they are created randomly), computing how long it takes to evaluate all hits. To work properly when running a query, we'd need to move `#bulkScorer` from `Weight` to `ScorerSupplier`, which I intend to do as a follow-up. ```java import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.ScorerSupplier; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; public class TermsEnumPrefetchBench { private static final int NUM_TERMS = 3; public static int DUMMY; public static void main(String[] args) throws Exception { Path dirPath = Paths.get(args[0]); Directory dir = FSDirectory.open(dirPath); if (DirectoryReader.indexExists(dir) == false) { TieredMergePolicy mp = new TieredMergePolicy(); mp.setSegmentsPerTier(100); mp.setMaxMergeAtOnce(100); mp.setMaxMergedSegmentMB(1024); try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig() .setMergePolicy(mp) .setRAMBufferSizeMB(1024))) { ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); AtomicLong indexed = new AtomicLong(0); for (int task = 0; task < 1000; ++task) { executor.execute(() -> { Random r = ThreadLocalRandom.current(); for (int i = 0; i < 1_000; ++i) { Document doc = new Document(); for (int j = 0; j < 10_000; ++j) { doc.add(new StringField("f", Long.toString(r.nextLong(20_000_000_000L)), Store.NO)); } try { w.addDocument(doc); } catch (IOException e) { throw new UncheckedIOException(e); } final long actualIndexed = indexed.incrementAndGet(); if (actualIndexed % 10_000 == 0) { System.out.println("Indexed: " + actualIndexed); } } }); } executor.shutdown(); executor.awaitTermination(1, TimeUnit.DAYS); w.commit(); System.out.println("Start force merging"); w.forceMerge(1); System.out.println("Done force merging"); w.commit(); } } List latencies = new ArrayList<>(); try (IndexReader reader = DirectoryReader.open(dir)) { IndexSearcher searcher = new IndexSearcher(reader); Random r = ThreadLocalRandom.current(); for (int i = 0; i < 10_000; ++i) { long start = System.nanoTime(); BooleanQuery.Builder query = new BooleanQuery.Builder(); for (int t = 0; t < NUM_TERMS; ++t) { query.add(new TermQuery(new Term("f", Long.toString(r.nextLong(20_000_000_000L)))), Occur.SHOULD); } Weight weight = searcher.createWeight(searcher.rewrite(query.build()), ScoreMode.COMPLETE_NO_SCORES, 1f); ScorerSupplier ss = weight.scorerSupplier(reader.leaves().get(0)); if (ss != null) { Scorer scorer = ss.get(Long.MAX_VALUE); DocIdSetIterator iter = scorer.iterator(); for (int d = iter.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = iter.nextDoc()) { DUMMY++; } long end = System.nanoTime(); latencies.add((end - start) / 1000); } } } latencies.sort(null); System.out.println("P50: " + latencies.get(latencies.size() / 2)); System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10)); System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100)); } } ```

Without the change: P50: 286 P90: 403 P99: 532

With the change: P50: 148 P90: 246 P99: 368

jpountz commented 1 month ago

I iterated a bit on this change:

mikemccand commented 1 month ago

But I created a benchmark that starts looking like running a Lucene query that is encouraging

Was this with a forced-cold index?

jpountz commented 1 month ago

It creates a 50GB terms dictionary while my machine only has ~28GB of RAM for the page cache, so many terms dictionary lookups result in page faults.

jpountz commented 1 month ago
Now that #13408 has been merged, I could update the benchmark to simply call IndexSearcher#search. ```java import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; public class TermsEnumPrefetchBench { private static final int NUM_TERMS = 3; public static int DUMMY; public static void main(String[] args) throws Exception { Path dirPath = Paths.get(args[0]); Directory dir = FSDirectory.open(dirPath); if (DirectoryReader.indexExists(dir) == false) { TieredMergePolicy mp = new TieredMergePolicy(); mp.setSegmentsPerTier(100); mp.setMaxMergeAtOnce(100); mp.setMaxMergedSegmentMB(1024); try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig() .setMergePolicy(mp) .setRAMBufferSizeMB(1024))) { ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); AtomicLong indexed = new AtomicLong(0); for (int task = 0; task < 1000; ++task) { executor.execute(() -> { Random r = ThreadLocalRandom.current(); for (int i = 0; i < 1_000; ++i) { Document doc = new Document(); for (int j = 0; j < 10_000; ++j) { doc.add(new StringField("f", Long.toString(r.nextLong(20_000_000_000L)), Store.NO)); } try { w.addDocument(doc); } catch (IOException e) { throw new UncheckedIOException(e); } final long actualIndexed = indexed.incrementAndGet(); if (actualIndexed % 10_000 == 0) { System.out.println("Indexed: " + actualIndexed); } } }); } executor.shutdown(); executor.awaitTermination(1, TimeUnit.DAYS); w.commit(); System.out.println("Start force merging"); w.forceMerge(1); System.out.println("Done force merging"); w.commit(); } } List latencies = new ArrayList<>(); try (IndexReader reader = DirectoryReader.open(dir)) { IndexSearcher searcher = new IndexSearcher(reader); Random r = ThreadLocalRandom.current(); for (int i = 0; i < 10_000; ++i) { long start = System.nanoTime(); BooleanQuery.Builder query = new BooleanQuery.Builder(); for (int t = 0; t < NUM_TERMS; ++t) { query.add(new TermQuery(new Term("f", Long.toString(r.nextLong(20_000_000_000L)))), Occur.SHOULD); } DUMMY += searcher.search(query.build(), 1, Sort.INDEXORDER).totalHits.value; long end = System.nanoTime(); latencies.add((end - start) / 1000); } } latencies.sort(null); System.out.println("P50: " + latencies.get(latencies.size() / 2)); System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10)); System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100)); } } ```

Results still look good.

Before the change: P50: 282 P90: 387 P99: 537

After the change: P50: 161 P90: 253 P99: 379

github-actions[bot] commented 1 week ago

This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the dev@lucene.apache.org list. Thank you for your contribution!

jpountz commented 1 week ago

I pushed a new approach. Instead of prepareSeekExact returning void, it now returns a Supplier and forbids calling any other method on TermsEnum until the Supplier has been consumed. There are two benefits:

The benchmark still reports similar numbers:

Without the change

P50: 307
P90: 423
P99: 585

With the change

P50: 162
P90: 258
P99: 405
jpountz commented 2 days ago

I will merge soon if there are no objections.