apache / bookkeeper

Apache BookKeeper - a scalable, fault tolerant and low latency storage service optimized for append-only workloads
https://bookkeeper.apache.org/
Apache License 2.0
1.91k stars 903 forks source link

ArrayIndexOutOfBoundsException on ConcurrentLongHashMap if autoshrink is true. #4316

Closed dlg99 closed 6 months ago

dlg99 commented 6 months ago

BUG REPORT

Describe the bug

Similar to https://github.com/apache/bookkeeper/issues/1606 but only happens if autoShrink is true. This was introduced by https://github.com/apache/bookkeeper/pull/3074 or some subsequent change related to autoShrink. @lordcheng10 fyi

Error in prod

ERROR org.apache.bookkeeper.proto.WriteEntryProcessor - Unexpected exception while writing 3901@24558 : Index 34 out of bounds for length 32
java.lang.ArrayIndexOutOfBoundsException: Index 34 out of bounds for length 32
        at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap$Section.get(ConcurrentLongHashMap.java:357) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap.get(ConcurrentLongHashMap.java:204) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.bookie.BookieImpl.addEntryInternal(BookieImpl.java:937) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.bookie.BookieImpl.addEntry(BookieImpl.java:1074) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.WriteEntryProcessor.processPacket(WriteEntryProcessor.java:79) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.PacketProcessorBase.run(PacketProcessorBase.java:202) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestProcessor.processAddRequest(BookieRequestProcessor.java:655) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestProcessor.processRequest(BookieRequestProcessor.java:377) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestHandler.channelRead(BookieRequestHandler.java:90) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.proto.AuthHandler$ServerSideHandler.channelRead(AuthHandler.java:89) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.proto.BookieProtoEncoding$RequestDecoder.channelRead(BookieProtoEncoding.java:477) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.stats.ThreadRegistry$RegisteredRunnable.run(ThreadRegistry.java:146) ~[org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at java.lang.Thread.run(Thread.java:842) ~[?:?]

A clear and concise description of what the bug is.

To Reproduce

diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index f1372b2894..f27d6f335c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -348,6 +348,7 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), n);
     }

+
     @Test
     public void concurrentInsertions() throws Throwable {
         ConcurrentLongHashMap<String> map =
@@ -488,6 +489,171 @@ public class ConcurrentLongHashMapTest {
         executor.shutdown();
     }

+    @Test
+    public void stressConcurrentInsertionsAndReads2() throws Throwable {
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder()
+                        .concurrencyLevel(4)
+                        .expectedItems(4)
+//                        .expandFactor(1.1f)
+//                        .shrinkFactor(1.1f)
+                        .autoShrink(true)
+                        .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int writeThreads = 8;
+        final int readThreads = 8;
+        final int n = 1_000_000;
+        String[] values = new String[] {
+                "v",
+                "vv",
+                "vvv",
+                "vvvv",
+                "vvvvv",
+                "vvvvvv",
+                "vvvvvvv",
+                "vvvvvvvv",
+                "vvvvvvvvv",
+                "vvvvvvvvvv",
+        };
+        final int numValues = values.length;
+
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+
+        System.out.println("Starting writes");
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]);
+                }
+            }));
+        }
+
+        System.out.println("Starting reads");
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    String value = map.get(key);
+                    if (value != null) {
+                        assertEquals(values[(int) Math.abs(key % numValues)], value);
+                    }
+                }
+            }));
+        }
+
+        System.out.println("Waiting for futures");
+        int count = 0;
+        for (Future<?> future : futures) {
+            future.get();
+            count++;
+            if (count % 1000 == 0) {
+                System.out.println("Completed " + count + " futures out of " + futures.size());
+            }
+        }
+
+        assertEquals(map.size(), n * writeThreads);
+
+        futures.clear();
+        barrier.reset();
+
+        System.out.println("Starting removes");
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]);
+                    map.remove(key);
+                    String value = map.get(key);
+                    assertNull(value);
+
+                }
+            }));
+        }
+
+        System.out.println("Starting reads 2");
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+
+            //for (int k = 0; k < 4; k++) {
+                futures.add(executor.submit(() -> {
+                    Random random = new Random(threadIdx);
+
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    for (int j = 0; j < n; j++) {
+                        long key = random.nextLong();
+                        // Ensure keys are uniques
+                        key -= key % (threadIdx + 1);
+
+                        String value = map.get(key);
+                        if (value != null) {
+                            assertEquals(values[(int) Math.abs(key % numValues)], value);
+                        }
+                    }
+                }));
+            //}
+        }
+
+        System.out.println("Waiting for futures 2");
+        count = 0;
+        for (Future<?> future : futures) {
+            future.get();
+            count++;
+            if (count % 1000 == 0) {
+                System.out.println("Completed " + count + " futures out of " + futures.size());
+            }
+        }
+        futures.clear();
+
+        executor.shutdown();
+    }
+
     @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map =
dao-jun commented 6 months ago

It looks this issue can be caused by rehash method.

executing rehash method requires writeLock, but run

                    int capacity = this.capacity;
                    bucket = signSafeMod(bucket, capacity);

                    // First try optimistic locking
                    long storedKey = keys[bucket];
                    V storedValue = values[bucket];

not requires a read lock.

dao-jun commented 6 months ago

I pushed a PR to fix the issue, and it can pass your test. https://github.com/apache/bookkeeper/pull/4317

thetumbled commented 6 months ago

This bug has been found long time ago, but the fix is merged into pulsar repository only, without bookkeeper. I think it is time to merge this pr to fix this problem, thanks. https://github.com/apache/bookkeeper/pull/4066

dao-jun commented 6 months ago

Good! @thetumbled

shoothzj commented 6 months ago

closed by #4066