apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.59k stars 1.01k forks source link

Improve LruQueryCache's concurrency [LUCENE-9038] #10081

Open asfimport opened 4 years ago

asfimport commented 4 years ago

LRUQueryCache appears to play a central role in Lucene's performance. There are many issues discussing its performance, such as #8290, #8292, #9075, #9260, and #10045. It appears that the cache's overhead can be just as much of a benefit as a liability, causing various workarounds and complexity.

When reviewing the discussions and code, the following issues are concerning:

  1. The cache is guarded by a single lock for all reads and writes.
  2. All computations are performed outside of the any locking to avoid penalizing other callers. This doesn't handle the cache stampedes meaning that multiple threads may cache miss, compute the value, and try to store it. That redundant work becomes expensive under load and can be mitigated with \~ per-key locks.
  3. The cache queries the entry to see if it's even worth caching. At first glance one assumes that is so that inexpensive entries don't bang on the lock or thrash the LRU. However, this is also used to indicate data dependencies for uncachable items (per JIRA), which perhaps shouldn't be invoking the cache.
  4. The cache lookup is skipped if the global lock is held and the value is computed, but not stored. This means a busy lock reduces performance across all usages and the cache's effectiveness degrades. This is not counted in the miss rate, giving a false impression.
  5. An attempt was made to perform computations asynchronously, due to their heavy cost on tail latencies. That work was reverted due to test failures and is being worked on.
  6. An in-progress change tries to avoid LRU thrashing due to large, infrequently used items being cached.
  7. The cache is tightly intertwined with business logic, making it hard to tease apart core algorithms and data structures from the usage scenarios.

It seems that more and more items skip being cached because of concurrency and hit rate performance, causing special case fixes based on knowledge of the external code flows. Since the developers are experts on search, not caching, it seems justified to evaluate if an off-the-shelf library would be more helpful in terms of developer time, code complexity, and performance. Solr has already introduced Caffeine in SOLR-8241 and SOLR-13817.

The proposal is to replace the internals LruQueryCache so that external usages are not affected in terms of the API. However, like in SolrCache, a difference is that Caffeine only bounds by either the number of entries or an accumulated size (e.g. bytes), but not both constraints. This likely is an acceptable divergence in how the configuration is honored.

cc @sigram, @dsmiley


Migrated from LUCENE-9038 by Ben Manes, updated Nov 26 2019 Attachments: cache.patch (versions: 2), CaffeineQueryCache.java

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

In retrospect, a separate QueryCache should be implemented. LruQueryCache declares in its contract that methods like onHit, onQueryEviction, etc. are executed under the global lock. This means implementations may rely on this exclusive read/write access to data structures, a requirement that cannot be supported efficiently. There are external usages of these hooks, such as in ElasticSearch, which would need to be reviewed. A deprecation and migration would be a safer approach for a transition.

asfimport commented 4 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

Hi Ben

This looks like a good summary of the issues that affect our cache. I'm quite impressed by how thorough this list is!

In retrospect, a separate QueryCache should be implemented. LruQueryCache declares in its contract that methods like onHit, onQueryEviction, etc. are executed under the global lock. This means implementations may rely on this exclusive read/write access to data structures, a requirement that cannot be supported efficiently.

If we can build a better cache then we will find a way to transition our users to it, I wouldn't worry about the migration path yet. We'll figure something out.

Since the developers are experts on search, not caching, it seems justified to evaluate if an off-the-shelf library would be more helpful in terms of developer time, code complexity, and performance.

We want lucene-core to be dependency-free, so we couldn't add caffeine as a dependency of lucene-core. However other options include having it as a dependency of a module that would expose a different cache implementation, reuse some of its ideas in the current cache implementation or fork the code that we need.

It appears that the cache's overhead can be just as much of a benefit as a liability, causing various workarounds and complexity.

FYI when I implemented this cache, I went for simplicity in terms of locking, so there is certainly room for improvement. One thing that is not obvious immediately and makes implementing a query cache for Lucene a bit tricky is that it needs to be able to efficiently evict all cache entries for a given segment. This is the reason why the current implementation uses two levels of maps instead of a single map that would take (Query,CacheHeler.Key) pairs as keys.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

On the train ride to work, I started to play with stubbing out an implementation to better understand what an implementation could look like. For now I'm just untangling things in my head due to lack of familiarity and not expecting anything to be adopted.

> We want lucene-core to be dependency-free, so we couldn't add caffeine as a dependency of lucene-core.

I am certainly fine with that and worry about it if I can offer something promising. In addition to the options you mentioned, we could shade / shadow the dependency to an internal package name.

> One thing that is not obvious immediately and makes implementing a query cache for Lucene a bit tricky is that it needs to be able to efficiently evict all cache entries for a given segment.

Thank you. I was trying to understand the LeafCache and was still under the impression that it was unnecessary complexity. Can you explain why caching of segments is needed? This certainly makes it a lot harder since they grow, as you cache the queries at the segment level.

Is this so that when updates occur all of the related cached queries are invalidated, to avoid stale responses? If so, would some versioning / generation field be applicable to maintain a single level cache? In that model the generation id is part of the key, allowing a simple increment to cause all of the prior content to not be fetched. This is common in remote caches (e.g. memcached) and, if doable here, we could maintain an index to proactively remove those stale entries.

asfimport commented 4 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

This is not due to invalidation but to how Lucene groups data into segments, that get regularly merged together into fewer bigger segments. When segments get merged away, they are closed, which triggers a callback on the cache that tells it that it may remove all entries that are about these segments, since they will never be used again. Before Lucene introduced a query cache, Elasticsearch used to have its own query cache that was based on Guava and used (Query,CacheKey.Helper) pairs as keys, and used to evict all entries for a segment by iterating over all cached entries and removing those that were about this segment. It triggered some interesting behaviors when closing top-level readers, which in-turn closes all their segments in sequence, which in-turn iterates over all remaining cached entries. So if you want to cache Q queries and have S segments, then you may have up to QxS entries in your cache, and thus closing the reader runs in O(QxS^2), and we were seeing users whose clusters would take ages to close indices because of this. One could make the argument that is is not required to evict those entries and that we could wait for them to get evicted naturally, but I don't like the idea of spending some of the JVM memory on unused data.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

Interesting. Off the cuff...

It sounds like we'd want to orchestrate it such that a write to level-2 needs to communicates to level-1 that the instance was modified, e.g. replace(key, v1, v1). That would trigger Guava/Caffeine to re-weigh the entry and trigger an eviction. If that write-back failed, e.g. removed or became v2, then the caller would have to manually call the entry's eviction logic (e.g. if closable). The L1 would be bounded to evict segments and L2 unbounded, which matches the current implementation. The coordination would need to be handled, but shouldn't be overly tricky, if I understand correctly.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

Attached a very rough sketch of what this could look like. A cache hit would be lock-free and a miss would be performed under a per-segment computeIfAbsent. A cheap computation back would cause the segment to be re-weighed, perhaps triggering an eviction. A lot of LruQueryCache needs to be ported over, but I think that is straightforward. It may look a lot like the current cache in the end, but benefit from having concurrent data structures to work off of.

Let me know if you think this is the right direction.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

Why would it be O(QxS^2) and not O(Q) entries to remove?

My initial impression would be to also have the key = (Query,CacheKey) pair and let the eviction policy drop the individual entries. When a new entry is added, we would maintain a separate index, CacheKey => Set<key>, to remove when the invalidating the CacheKey. This secondary index would be maintained using computations and be a weakKey reference cache, which lets us ignore a few subtle races.

The caching policy already includes a frequency-based admission filter on top of LRU, similar in spirit to your scheme. However in a 2 layer model it would track frequency of the leaf cache rather than the query entries, which negates its usefulness. I think due to richer computations and better write concurrency we could make a simpler 1-layer cache work efficiently.

Do you have benchmark scenarios that I could run? If so, I might create both versions so that we could benchmark to see which is preferable.

asfimport commented 4 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

The L1 would be bounded to evict segments and L2 unbounded, which matches the current implementation.

I don't think it matches the current implementation. When going over the maximum weight, your patch would remove all cache entries from the least-recently-used segment, while the current implementation removes all cache entries from the least-recently used query? For the record, this part of the current impl is not great as it makes eviction run in linear time with the number of segments, but I couldn't find any other way that wouldn't introduce worse issues.

Why would it be O(QxS^2) and not O(Q) entries to remove?

If we are talking about the size of the cache, then it would be about O(QxS) assuming that queries are cached on every segment. Here I was more commenting on sequentially removing entries for all segments, one segment at a time and the overall runtime of doing this with a cache that uses (Query,CacheKey) pairs as keys.

Do you have benchmark scenarios that I could run?

Unfortunately I don't.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

hmm.. okay, so the behavior is certainly too complex for my initial runs at the problems.

A few ideas to consider based on evolving the cache and helping to alleviate some of the challenges.

  1. Avoid the tryLock on read by borrowing the trick to record the access into a striped, lossy ring buffer and replay the events under the lock. This lets you use a ConcurrentHashMap for lock-free reads. Instead of contending on a lock to perform tiny LRU reordering work, you schedule and perform a batch under the lock to modify the non-threadsafe data structures. The striping and lossy behavior mitigates hot spots under high load.
  2. Perform the computations under a striped lock in double-checked locking fashion. This would avoid redundant work due to dogpiling, while also allowing mostly parallel loading of independent keys (e.g. var lock = locks[key.hashCode() % locks.length]).

This would mitigate some of the problems but also add complexity, whereas I originally hoped to reduce that burden.

asfimport commented 4 years ago

Adrien Grand (@jpountz) (migrated from JIRA)

These sound like good ideas to me.

asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

I added a per segment ring buffers for recording the query hits. When over half full then tryLock is used to optimistically drain back onto the LRU. When an entry is added via putIfAbsent, the buffers are drained prior to the eviction. The maps were changed to use ConcurrentHashMap so that reads can be lock-free when successful. This should improve read throughput, but is no longer a strict LRU like your tests expect. In the face of concurrency that shouldn't be expected, so the stats + lru ordering tests are intentionally broken.

This design is similar to Guava's cache which had forked the legacy CHM to add per-segment features. When I had proposed how to add LRU, I had stubbed the code using ConcurrentLinkedQueue read buffers. Unfortunately my intent to replace that with ring buffers never garnered interest, even though it improved performance 25x. This still suffers from hotspots as hot entries are in the same segment, but similarly its least invasive fix given existing code.

I also added a simple striped locking scheme for computing the query results. This should allow only a single thread to compute the results for a given query and avoid stampede effects. A subsequent thread would block waiting for the first to finish caching the results.

How does this look?

cache.patch

cache.patch ```diff diff --git a/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java b/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java index da881421bf0..f431af36f55 100644 --- a/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java +++ b/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java @@ -17,6 +17,11 @@ package org.apache.lucene.search; +import static java.util.stream.Collectors.toCollection; +import static org.apache.lucene.util.RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY; +import static org.apache.lucene.util.RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY; +import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -28,9 +33,15 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.IntStream; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReaderContext; @@ -43,10 +54,6 @@ import org.apache.lucene.util.BitDocIdSet; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.RoaringDocIdSet; -import static org.apache.lucene.util.RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY; -import static org.apache.lucene.util.RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY; -import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED; - /** * A {@link QueryCache} that evicts queries using a LRU (least-recently-used) * eviction policy in order to remain under a given maximum size and number of @@ -88,6 +95,7 @@ import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_U * @lucene.experimental */ public class LRUQueryCache implements QueryCache, Accountable { + private static final int MAX_CONCURRENCY_LOADS = 2048; // must be a power-of-two private final int maxSize; private final long maxRamBytesUsed; @@ -98,10 +106,11 @@ public class LRUQueryCache implements QueryCache, Accountable { // The contract between this set and the per-leaf caches is that per-leaf caches // are only allowed to store sub-sets of the queries that are contained in // mostRecentlyUsedQueries. This is why write operations are performed under a lock - private final Set mostRecentlyUsedQueries; + private final Map mostRecentlyUsedQueries; private final Map cache; private final ReentrantLock lock; private final float skipCacheFactor; + private final Lock[] queryLocks; // these variables are volatile so that we do not need to sync reads // but increments need to be performed under the lock @@ -129,9 +138,11 @@ public class LRUQueryCache implements QueryCache, Accountable { } this.skipCacheFactor = skipCacheFactor; - uniqueQueries = new LinkedHashMap<>(16, 0.75f, true); - mostRecentlyUsedQueries = uniqueQueries.keySet(); - cache = new IdentityHashMap<>(); + uniqueQueries = new ConcurrentHashMap<>(); + mostRecentlyUsedQueries = new LinkedHashMap<>(16, 0.75f, true); + queryLocks = IntStream.range(0, MAX_CONCURRENCY_LOADS) + .mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new); + cache = new ConcurrentHashMap<>(); lock = new ReentrantLock(); ramBytesUsed = 0; } @@ -268,30 +279,45 @@ public class LRUQueryCache implements QueryCache, Accountable { } DocIdSet get(Query key, IndexReader.CacheHelper cacheHelper) { - assert lock.isHeldByCurrentThread(); assert key instanceof BoostQuery == false; assert key instanceof ConstantScoreQuery == false; final IndexReader.CacheKey readerKey = cacheHelper.getKey(); final LeafCache leafCache = cache.get(readerKey); if (leafCache == null) { - onMiss(readerKey, key); return null; } - // this get call moves the query to the most-recently-used position + final Query singleton = uniqueQueries.get(key); if (singleton == null) { - onMiss(readerKey, key); return null; } + final DocIdSet cached = leafCache.get(singleton); - if (cached == null) { - onMiss(readerKey, singleton); - } else { - onHit(readerKey, singleton); + if ((cached == null) && !leafCache.readBuffer.offer(singleton) && lock.tryLock()) { + try { + drainReadBuffers(); + } finally { + lock.unlock(); + } } return cached; } + private void drainReadBuffers() { + assert lock.isHeldByCurrentThread(); + + for (LeafCache leafCache : cache.values()) { + leafCache.readBuffer.drainTo(query -> { + Query singleton = uniqueQueries.get(query); + if (singleton != null) { + // this get call moves the query to the most-recently-used position + mostRecentlyUsedQueries.get(query); + onHit(leafCache.key, singleton); + } + }); + } + } + private void putIfAbsent(Query query, DocIdSet set, IndexReader.CacheHelper cacheHelper) { assert query instanceof BoostQuery == false; assert query instanceof ConstantScoreQuery == false; @@ -304,7 +330,9 @@ public class LRUQueryCache implements QueryCache, Accountable { } else { query = singleton; } + mostRecentlyUsedQueries.put(query, Boolean.TRUE); final IndexReader.CacheKey key = cacheHelper.getKey(); + onMiss(key, query); LeafCache leafCache = cache.get(key); if (leafCache == null) { leafCache = new LeafCache(key); @@ -315,6 +343,7 @@ public class LRUQueryCache implements QueryCache, Accountable { cacheHelper.addClosedListener(this::clearCoreCacheKey); } leafCache.putIfAbsent(query, set); + drainReadBuffers(); evictIfNecessary(); } finally { lock.unlock(); @@ -326,10 +355,11 @@ public class LRUQueryCache implements QueryCache, Accountable { // under a lock to make sure that mostRecentlyUsedQueries and cache keep sync'ed if (requiresEviction()) { - Iterator iterator = mostRecentlyUsedQueries.iterator(); + Iterator iterator = mostRecentlyUsedQueries.keySet().iterator(); do { final Query query = iterator.next(); final int size = mostRecentlyUsedQueries.size(); + uniqueQueries.remove(query); iterator.remove(); if (size == mostRecentlyUsedQueries.size()) { // size did not decrease, because the hash of the query changed since it has been @@ -374,6 +404,7 @@ public class LRUQueryCache implements QueryCache, Accountable { try { final Query singleton = uniqueQueries.remove(query); if (singleton != null) { + mostRecentlyUsedQueries.remove(singleton); onEviction(singleton); } } finally { @@ -398,24 +429,37 @@ public class LRUQueryCache implements QueryCache, Accountable { cache.clear(); // Note that this also clears the uniqueQueries map since mostRecentlyUsedQueries is the uniqueQueries.keySet view: mostRecentlyUsedQueries.clear(); + uniqueQueries.clear(); onClear(); } finally { lock.unlock(); } } + // pkg-private for testing + void cleanUp() { + lock.lock(); + try { + drainReadBuffers(); + } finally { + lock.unlock(); + } + } + // pkg-private for testing void assertConsistent() { lock.lock(); try { + drainReadBuffers(); + if (requiresEviction()) { throw new AssertionError("requires evictions: size=" + mostRecentlyUsedQueries.size() + ", maxSize=" + maxSize + ", ramBytesUsed=" + ramBytesUsed() + ", maxRamBytesUsed=" + maxRamBytesUsed); } for (LeafCache leafCache : cache.values()) { - Set keys = Collections.newSetFromMap(new IdentityHashMap<>()); - keys.addAll(leafCache.cache.keySet()); - keys.removeAll(mostRecentlyUsedQueries); + Set keys = leafCache.cache.keySet().stream().map(key -> key.query) + .collect(toCollection(() -> Collections.newSetFromMap(new IdentityHashMap<>()))); + keys.removeAll(mostRecentlyUsedQueries.keySet()); if (!keys.isEmpty()) { throw new AssertionError("One leaf cache contains more keys than the top-level cache: " + keys); } @@ -451,7 +495,7 @@ public class LRUQueryCache implements QueryCache, Accountable { List cachedQueries() { lock.lock(); try { - return new ArrayList<>(mostRecentlyUsedQueries); + return new ArrayList<>(mostRecentlyUsedQueries.keySet()); } finally { lock.unlock(); } @@ -607,12 +651,14 @@ public class LRUQueryCache implements QueryCache, Accountable { private class LeafCache implements Accountable { private final Object key; - private final Map cache; + private final Map cache; + private final RingBuffer readBuffer; private volatile long ramBytesUsed; LeafCache(Object key) { this.key = key; - cache = new IdentityHashMap<>(); + cache = new ConcurrentHashMap<>(); + readBuffer = new RingBuffer(); ramBytesUsed = 0; } @@ -629,13 +675,13 @@ public class LRUQueryCache implements QueryCache, Accountable { DocIdSet get(Query query) { assert query instanceof BoostQuery == false; assert query instanceof ConstantScoreQuery == false; - return cache.get(query); + return cache.get(new IdentityKey(query)); } void putIfAbsent(Query query, DocIdSet set) { assert query instanceof BoostQuery == false; assert query instanceof ConstantScoreQuery == false; - if (cache.putIfAbsent(query, set) == null) { + if (cache.putIfAbsent(new IdentityKey(query), set) == null) { // the set was actually put onDocIdSetCache(HASHTABLE_RAM_BYTES_PER_ENTRY + set.ramBytesUsed()); } @@ -644,7 +690,7 @@ public class LRUQueryCache implements QueryCache, Accountable { void remove(Query query) { assert query instanceof BoostQuery == false; assert query instanceof ConstantScoreQuery == false; - DocIdSet removed = cache.remove(query); + DocIdSet removed = cache.remove(new IdentityKey(query)); if (removed != null) { onDocIdSetEviction(HASHTABLE_RAM_BYTES_PER_ENTRY + removed.ramBytesUsed()); } @@ -654,7 +700,29 @@ public class LRUQueryCache implements QueryCache, Accountable { public long ramBytesUsed() { return ramBytesUsed; } + } + + private static final class IdentityKey { + final Query query; + + public IdentityKey(Query query) { + this.query = query; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } else if (!(o instanceof IdentityKey)) { + return false; + } + return ((IdentityKey) o).query == query; + } + @Override + public int hashCode() { + return System.identityHashCode(query); + } } private class CachingWrapperWeight extends ConstantScoreWeight { @@ -727,18 +795,7 @@ public class LRUQueryCache implements QueryCache, Accountable { return in.scorerSupplier(context); } - // If the lock is already busy, prefer using the uncached version than waiting - if (lock.tryLock() == false) { - return in.scorerSupplier(context); - } - - DocIdSet docIdSet; - try { - docIdSet = get(in.getQuery(), cacheHelper); - } finally { - lock.unlock(); - } - + DocIdSet docIdSet = get(in.getQuery(), cacheHelper); if (docIdSet == null) { if (policy.shouldCache(in.getQuery())) { final ScorerSupplier supplier = in.scorerSupplier(context); @@ -838,22 +895,21 @@ public class LRUQueryCache implements QueryCache, Accountable { return in.bulkScorer(context); } - // If the lock is already busy, prefer using the uncached version than waiting - if (lock.tryLock() == false) { - return in.bulkScorer(context); - } - - DocIdSet docIdSet; - try { - docIdSet = get(in.getQuery(), cacheHelper); - } finally { - lock.unlock(); - } - + // If a cache miss, then use double-checked striped locking to avoid dog piling on a miss + DocIdSet docIdSet = get(in.getQuery(), cacheHelper); if (docIdSet == null) { if (policy.shouldCache(in.getQuery())) { - docIdSet = cache(context); - putIfAbsent(in.getQuery(), docIdSet, cacheHelper); + Lock queryLock = getQueryLock(in.getQuery()); + queryLock.lock(); + try { + docIdSet = get(in.getQuery(), cacheHelper); + if (docIdSet == null) { + docIdSet = cache(context); + putIfAbsent(in.getQuery(), docIdSet, cacheHelper); + } + } finally { + queryLock.unlock(); + } } else { return in.bulkScorer(context); } @@ -871,5 +927,74 @@ public class LRUQueryCache implements QueryCache, Accountable { return new DefaultBulkScorer(new ConstantScoreScorer(this, 0f, ScoreMode.COMPLETE_NO_SCORES, disi)); } + private Lock getQueryLock(Query query) { + int hash = in.getQuery().hashCode(); + + // Apply a supplemental hash function to defends against poor quality hash + hash = ((hash >>> 16) ^ hash) * 0x45d9f3b; + hash = ((hash >>> 16) ^ hash) * 0x45d9f3b; + hash = (hash >>> 16) ^ hash; + + int index = hash & (queryLocks.length - 1); + return queryLocks[index]; + } + } + + private static final class RingBuffer { + /** The maximum number of elements per buffer. */ + static final int BUFFER_SIZE = 64; + + // Assume 4-byte references and 64-byte cache line (16 elements per line) + static final int SPACED_SIZE = BUFFER_SIZE << 4; + static final int SPACED_MASK = SPACED_SIZE - 1; + static final int OFFSET = 16; + + final AtomicReferenceArray buffer; + final AtomicLong writeCounter; + final AtomicLong readCounter; + + public RingBuffer() { + buffer = new AtomicReferenceArray<>(SPACED_SIZE); + writeCounter = new AtomicLong(); + readCounter = new AtomicLong(); + } + + public boolean offer(E e) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size >= SPACED_SIZE) { + return false; + } + + if (writeCounter.compareAndSet(tail, tail + OFFSET)) { + int index = (int) (tail & SPACED_MASK); + buffer.lazySet(index, e); + // drain if over half full + return (size > (SPACED_SIZE / 2)); + } + return false; + } + + public void drainTo(Consumer consumer) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size == 0) { + return; + } + do { + int index = (int) (head & SPACED_MASK); + E e = buffer.get(index); + if (e == null) { + // not published yet + break; + } + buffer.lazySet(index, null); + consumer.accept(e); + head += OFFSET; + } while (head != tail); + writeCounter.lazySet(head); + } } } ```
asfimport commented 4 years ago

Ben Manes (migrated from JIRA)

I tried running the luceneutil benchmark against this change rebased on master. The benchmark is pretty noisy and not sure how the cache interacts, but these were the results.

                    TaskQPS baseline      StdDevQPS my_modified_version      StdDev                Pct diff
                 Respell      184.34     (32.4%)      167.09     (34.9%)   -9.4% ( -57% -   85%)
                  Fuzzy1      213.08     (15.1%)      202.41     (15.4%)   -5.0% ( -30% -   30%)
   BrowseMonthSSDVFacets     1789.91     (10.4%)     1759.04     (11.6%)   -1.7% ( -21% -   22%)
                 LowTerm     3172.83     (11.3%)     3149.06     (11.1%)   -0.7% ( -20% -   24%)
         LowSloppyPhrase      510.21     (12.6%)      505.35      (5.4%)   -1.0% ( -16% -   19%)
               OrHighLow      911.22     (11.4%)      907.11      (8.7%)   -0.5% ( -18% -   22%)
             MedSpanNear      639.59     (14.3%)      637.37     (11.8%)   -0.3% ( -23% -   29%)
       HighTermMonthSort     1410.18     (14.8%)     1414.44     (17.8%)    0.3% ( -28% -   38%)
              OrHighHigh      282.72     (18.9%)      283.90     (27.8%)    0.4% ( -38% -   58%)
              AndHighLow     1811.44     (16.5%)     1826.13      (8.5%)    0.8% ( -20% -   30%)
               LowPhrase      830.24     (12.8%)      837.28      (9.7%)    0.8% ( -19% -   26%)
BrowseDayOfYearSSDVFacets     1538.60      (9.5%)     1552.58     (11.5%)    0.9% ( -18% -   24%)
                HighTerm     1010.87     (11.3%)     1020.64      (9.9%)    1.0% ( -18% -   24%)
               MedPhrase      571.41     (11.5%)      579.31      (7.3%)    1.4% ( -15% -   22%)
         MedSloppyPhrase      417.12     (21.1%)      423.51     (21.7%)    1.5% ( -34% -   56%)
             LowSpanNear      746.19     (18.1%)      758.25     (12.9%)    1.6% ( -24% -   39%)
                Wildcard      184.23     (29.0%)      187.63     (29.3%)    1.8% ( -43% -   84%)
    BrowseDateTaxoFacets     2747.64     (15.6%)     2804.34     (14.5%)    2.1% ( -24% -   38%)
BrowseDayOfYearTaxoFacets     6748.62      (7.1%)     6900.47      (6.1%)    2.3% ( -10% -   16%)
             AndHighHigh      608.66     (11.9%)      622.76     (16.0%)    2.3% ( -22% -   34%)
              AndHighMed     1974.49     (14.2%)     2031.35     (10.3%)    2.9% ( -18% -   31%)
                  Fuzzy2       19.26     (69.9%)       19.84     (54.5%)    3.0% ( -71% -  423%)
                 MedTerm     2809.96      (9.1%)     2900.82     (10.7%)    3.2% ( -15% -   25%)
    HighIntervalsOrdered      253.46     (37.7%)      261.87     (43.4%)    3.3% ( -56% -  135%)
   BrowseMonthTaxoFacets     6838.39      (8.4%)     7109.56      (8.4%)    4.0% ( -11% -   22%)
        HighSloppyPhrase      379.10     (20.3%)      395.81     (20.4%)    4.4% ( -30% -   56%)
   HighTermDayOfYearSort      498.94     (15.7%)      527.78     (13.5%)    5.8% ( -20% -   41%)
                PKLookup      158.51     (27.8%)      169.54     (12.9%)    7.0% ( -26% -   65%)
                 Prefix3      168.46     (38.7%)      180.95     (36.4%)    7.4% ( -48% -  134%)
              HighPhrase      260.05     (34.0%)      279.62     (20.5%)    7.5% ( -35% -   94%)
                  IntNRQ      598.33     (33.7%)      651.97     (33.9%)    9.0% ( -43% -  115%)
               OrHighMed      378.56     (32.7%)      427.55     (16.9%)   12.9% ( -27% -   93%)
            HighSpanNear      217.85     (37.3%)      249.79     (36.3%)   14.7% ( -42% -  140%)