apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.58k stars 1.01k forks source link

DocumentsWriterDeleteQueue.getNextSequenceNumber assertion failure seqNo=9 vs maxSeqNo=8 #13571

Closed aoli-al closed 1 month ago

aoli-al commented 1 month ago

Description

I saw the following assertion failure when running TestSnapshotDeletionPolicy#testMultiThreadedSnapshotting

Caused by: java.lang.AssertionError: seqNo=9 vs maxSeqNo=8
        at org.apache.lucene.index.DocumentsWriterDeleteQueue.getNextSequenceNumber(DocumentsWriterDeleteQueue.java:567)
        at org.apache.lucene.index.DocumentsWriterDeleteQueue.updateSlice(DocumentsWriterDeleteQueue.java:286)
        at org.apache.lucene.index.DocumentsWriterPerThread.finishDocuments(DocumentsWriterPerThread.java:344)
        at org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:284)
        at org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:425)
        at org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1552)
        at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1837)
        at org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1477)
        at org.apache.lucene.index.TestSnapshotDeletionPolicy$2.run(TestSnapshotDeletionPolicy.java:302)

This failure might also be related to #13446 and #13127. Please apply the following patch to reproduce the failure. It takes ~30 seconds to show the error.

diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
index 8a8fc72ab4c..12e05293a19 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
@@ -38,7 +38,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
     int concurrency = coreCount / 4;
     concurrency = Math.max(MIN_CONCURRENCY, concurrency);
     concurrency = Math.min(MAX_CONCURRENCY, concurrency);
-    return concurrency;
+    return 3;
   }

   final int concurrency;
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 7955df5630e..ba3c6f0d3f0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -411,8 +411,33 @@ final class DocumentsWriter implements Closeable, Accountable {
       final DocumentsWriterDeleteQueue.Node<?> delNode)
       throws IOException {
     boolean hasEvents = preUpdate();
+    if (Thread.currentThread().getName().contains("t9")) {
+        try {
+            Thread.sleep(100000);
+        } catch (Exception e) {}
+    }

+    if (!Thread.currentThread().getName().contains("t0")) {
+      if (Thread.currentThread().getName().contains("t1")) {
+        try {
+          Thread.sleep(5000);
+        } catch (Exception e) {}
+      } else if (Thread.currentThread().getName().contains("t2")) {
+        try {
+          Thread.sleep(5000);
+        } catch (Exception e) {}
+      } else {
+        try {
+          Thread.sleep(1000);
+        } catch (Exception e) {}
+      }
+    }
     final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
+    if (!Thread.currentThread().getName().contains("t0") && !Thread.currentThread().getName().contains("t1") && !Thread.currentThread().getName().contains("t2")) {
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {}
+    }
     final DocumentsWriterPerThread flushingDWPT;
     long seqNo;

@@ -437,6 +462,11 @@ final class DocumentsWriter implements Closeable, Accountable {
       }
       assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort";
     }
+    if (!Thread.currentThread().getName().contains("t0")) {
+      try {
+        Thread.sleep(50000);
+      } catch (Exception e) {}
+    }

     if (postUpdate(flushingDWPT, hasEvents)) {
       seqNo = -seqNo;
@@ -542,6 +572,9 @@ final class DocumentsWriter implements Closeable, Accountable {

   synchronized long resetDeleteQueue(int maxNumPendingOps) {
     final DocumentsWriterDeleteQueue newQueue = deleteQueue.advanceQueue(maxNumPendingOps);
+    try {
+      Thread.sleep(5000);
+    } catch (Exception e) {}
     assert deleteQueue.isAdvanced();
     assert newQueue.isAdvanced() == false;
     assert deleteQueue.getLastSequenceNumber() <= newQueue.getLastSequenceNumber();
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 170966e8ae4..7f04fabb9b2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -574,6 +574,12 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
       } finally {
         perThreadPool.unlockNewWriters();
       }
+      if (Thread.currentThread().getName().contains("t0")) {
+        try {
+          Thread.sleep(5000);
+        } catch (Exception e) {
+        }
+      }
     }
     final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>();
     for (final DocumentsWriterPerThread next :
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index d69a71bfea5..8cc4ce848b7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -140,6 +140,11 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
     final long ramBytesUsed = state.ramBytesUsed();
     assert contains(state)
         : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
+    if (Thread.currentThread().getName().contains("t0")) {
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {}
+    }
     freeList.addAndUnlock(state, ramBytesUsed);
   }

Some observations to reproduce the failure:

Basically, one thread tries to close the DocumentsWriterDeleteQueue, but other threads still try to call getNextSequenceNumber. Here is the happens-before order:

t0: flushControl.obtainAndLock();
t3-t9: flushControl.obtainAndLock();
t0: deleteQueue.advanceQueue(maxNumPendingOps); // maxNumPendingOps = 7
// Here delete queue is updated but not assigned to DocumentWriter.deletetQueue
t3-t9: getNextSequenceNumber()
t1: perThreadPool.getAndLock(); // returns the stale deleteQueue from `freeList`
t1: getNextSequenceNumber()
t2: perThreadPool.getAndLock(); // returns the stale deleteQueue from `freeList`
t2: getNextSequenceNumber()
t0: close // assertion error

Version and environment details

Commit: 33a4c1d8ef999902dacedde9c7f04a3c7e2e78c9 Java version: openjdk 21.0.3 2024-04-16

aoli-al commented 1 month ago

Hi @jpountz and @benwtrent, I saw your previous discussion about the test failure #13127 . Do you think this patch will help you to replay the failure? Also, let me know if you have any questions

benwtrent commented 1 month ago

This is indeed interesting @aoli-al I will have to dig deeper to see. This part of the codebase is...opaque to say the least :/

benwtrent commented 1 month ago

I dug a little bit into this. I tried protecting by putting synchronized on getNextSequenceNumber that didn't work.

I tried putting synchronize on the DW when it flushes all threads and calls close on the DWDQ, that didn't work.

This is exceptionally tricky because the danged deleteQueue is passed around to many different classes and accessed through many different paths. Its not well encapsulated at all :(

aoli-al commented 1 month ago

Thanks for confirming this! Yes, I found the bug extremely tricky to trigger while trying to reproduce.

Making DocumentsWriterFlushControl:obtainAndLock synchronized will make the exception disappear. But I don't know if this will introduce any deadlock to any other tests.