moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.27k stars 814 forks source link

Some issues with the new segmented store #710

Open hylkevds opened 1 year ago

hylkevds commented 1 year ago

Testing the latest master with my stress test app (many clients, connects, disconnects, etc...), I run into some exceptions.

As soon as the store opens a new page file this happens:

java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(23) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
    at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(23) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:895)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:903)
    at io.netty.buffer.WrappedByteBuf.readBytes(WrappedByteBuf.java:657)
    at io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:498)
    at io.moquette.persistence.SegmentPersistentQueue$SerDes.writePayload(SegmentPersistentQueue.java:53)
    at io.moquette.persistence.SegmentPersistentQueue$SerDes.write(SegmentPersistentQueue.java:41)
    at io.moquette.persistence.SegmentPersistentQueue$SerDes.toBytes(SegmentPersistentQueue.java:26)
    at io.moquette.persistence.SegmentPersistentQueue.enqueue(SegmentPersistentQueue.java:125)
    at io.moquette.persistence.SegmentPersistentQueue.enqueue(SegmentPersistentQueue.java:16)
    at io.moquette.broker.Session.sendPublishQos2(Session.java:328)
    at io.moquette.broker.Session.sendPublishOnSessionAtQos(Session.java:257)
    at io.moquette.broker.Session.sendNotRetainedPublishOnSessionAtQos(Session.java:243)
    at io.moquette.broker.PostOffice.publishToSession(PostOffice.java:519)
    at io.moquette.broker.PostOffice.publishToSession(PostOffice.java:508)
    at io.moquette.broker.PostOffice.lambda$publish2Subscribers$3(PostOffice.java:482)
    at io.moquette.broker.PostOffice$BatchingPublishesCollector.lambda$routeBatchedPublishes$0(PostOffice.java:439)
    at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
    at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:635)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)

After that there are all sorts of other exceptions, but those are probably caused by the inconsistent state of the store:

java.util.concurrent.ExecutionException: java.nio.BufferUnderflowException
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
    at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.BufferUnderflowException: null
    at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:643)
    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165)
    at io.moquette.persistence.SegmentPersistentQueue$SerDes.fromBytes(SegmentPersistentQueue.java:86)
    at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:148)
    at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:16)
    at io.moquette.broker.Session.drainQueueToConnection(Session.java:414)
    at io.moquette.broker.Session.processPubComp(Session.java:230)
    at io.moquette.broker.MQTTConnection.lambda$processPubComp$0(MQTTConnection.java:112)
    at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
    at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:635)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)
    ... 2 common frames omitted

I'll try to track down the details.

hylkevds commented 1 year ago

I'm getting closer to identifying the problem. It must be a threading issue. I just got:

java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))

Since 0 + 23 definitely does not exceed 23, the numbers must have been changed by another thread, in between the check and the creation of the exception. And that should never happen, and could very well explain the weird symptoms. And also why it doesn't happen in the unit tests.

hylkevds commented 1 year ago

Ok, I found the issue for this Exception. The different threads all work on the same buffer, with the same indexes. Ideally we could make one retainedDuplicate for each thread, This PR just makes a duplicate when reading the buffer: #716. Not quite as nice, but does have the advantage that we don't need to reset the reader index.

However, there are more issues:

19:52:46.403 [Session Executor 11] INFO     i.m.broker.SessionEventLoop - SessionEventLoop Session Executor 11 reached exception in processing command
java.util.concurrent.ExecutionException: java.nio.BufferUnderflowException
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
    at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.BufferUnderflowException: null
    at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:643)
    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165)
    at io.moquette.persistence.SegmentPersistentQueue$SerDes.fromBytes(SegmentPersistentQueue.java:87)
    at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:149)
    at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:17)
    at io.moquette.broker.Session.drainQueueToConnection(Session.java:414)
    at io.moquette.broker.Session.processPubComp(Session.java:230)
    at io.moquette.broker.MQTTConnection.lambda$processPubComp$0(MQTTConnection.java:116)
    at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
    at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:648)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)
    ... 2 common frames omitted
hylkevds commented 1 year ago

Making a duplicate per Command Thread was easier than expected. I've updated #716 :)

hylkevds commented 1 year ago

After chopping up #716 there is one open question left:

Currently each Segment directly operates on a (shared) MemoryMappedBuffer of the entire Page.

I'm wondering if it isn't better to give each Segment a slice() of the mapped (page) buffer. That slice will be operating directly on the page buffer, but will not interfere with the slices of other Segments. For reading and writing, instead of calculating the position relative on the page, it would have to be calculated relative to the slice.

I'm also wondering about the efficiency of the QueuePool.openNextTailSegment(String name) method. It seems to re-open the entire page file buffer for each segment in that page that is opened. Do you know if the back-end implementation is smart enough to not reserve memory for the entire file, for each segment that is opened it in? If we do change the Segment implementation to work on a slice, then we could just map the segment part of the file to the Segment. That may be more memory efficient.

WDYT?