moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.3k stars 818 forks source link

Use a copy of the buffer when reading for storing. #716

Closed hylkevds closed 1 year ago

hylkevds commented 1 year ago

Other threads may be reading the same buffer, so we must ensure we have our own reading position and mark.

An alternative solution would be to have PostOffice.publish2Subscribers() somehow create a duplicate for each thread.

hylkevds commented 1 year ago

I've added a test file for the SegmentPersistentQueues, that reliably fails in a single-threaded test after 2621450 messages:

org.opentest4j.AssertionFailedError: Failed on count 2621450, j 0
Caused by: java.lang.IllegalArgumentException: Can't recognize record of type: 108
    at io.moquette.persistence.SegmentPersistentQueueTest.testPerformance(SegmentPersistentQueueTest.java:195)

@andsel does that help you in tracking down the problem?

andsel commented 1 year ago

I'll check this in next days

hylkevds commented 1 year ago

After debugging, with messages of 81 bytes (so they take exactly 100 in the queue) and segments of 500 bytes, it seems things go wrong when the queue is completely consumed with a message ending exactly at the end of a segment.

Because of the checks on Queue.java#L264 the segment is not released, even though it is empty, to ensure we don't end up with a queue with 0 segments.

However, the next Read action does not go to the next segment, but instead starts over at the start of the segment it just completed. This is (of course) wrong, since it will just read random bytes, and anything can happen from there on.

I think the cleaner solution is to allow a queue to have 0 segments, and open a segment at the first write if this is the case...

hylkevds commented 1 year ago

A quick hack to allow a queue to have 0 segments seems to work, but may cause issues when loading the queues from disk.

But the next issue popped up. Things also go awry when reading a message where only the 4 header bytes fit in the segment, and the rest is in the next segment. In that case the segment.bytesAfter(scan) returns the full segment size, instead of 0 (after reading the header, the segment is empty)

The problem is caused by:

    public long segmentOffset() {
        return logicalOffset % Segment.SIZE;
    }

When the logicalOffset hits the boundary (i.e. the Segment has 0 space left) then the returned SegmentOffset becomes 0, and not Segment.SIZE. I think to really solve this, we need to store the segment number in the VirtualPointer, so that we don't need to do the % thing to find the segmentOffset, but instead can do simple subtraction.

hylkevds commented 1 year ago

I managed to fix the last corner-case too.

I've made the page size and segment size configurable. This makes it easier to set up specific scenarios for testing.

hylkevds commented 1 year ago

The segmented queues now seem to work correctly.

Next up is the issue of the MQTTConnection that has several calls to bindedSession.flushAllQueuedMessages() which are executed on the Netty thread, and access the queue. That's no longer allowed. Putting all those on the command queue leads to queue overflows during tests. Probably too many of 'em, so we have to come up with something else.

andsel commented 1 year ago

Hi @hylkevds I've a question. When you say that

After debugging, with messages of 81 bytes (so they take exactly 100 in the queue) and segments of 500 bytes

81 bytes is the payload to enqueue? How that can take 100 bytes in queue. I mean that if a payload message takes 100 bytes in the queue, considering 4 bytes for length header, it means that the available payload size is 96, so I don't understand where the 81 comes from.

I think that we should split this PR in 2 smaller PRs:

hylkevds commented 1 year ago

Hi @hylkevds I've a question. When you say that

After debugging, with messages of 81 bytes (so they take exactly 100 in the queue) and segments of 500 bytes 81 bytes is the payload to enqueue? How that can take 100 bytes in queue. I mean that if a payload message takes 100 bytes in the queue, considering 4 bytes for length header, it means that the available payload size is 96, so I don't understand where the 81 comes from.

Yeah, that was not formulated the best way :) The Body ended up being 81 bytes, since I had a Topic of 5. With the topic and body sizes (4 each=8), qos (1) message type (1) and the length header (4) you end up with 100.

I think that we should split this PR in 2 smaller PRs:

  • one to fix the segmented queue issue that you spotted
  • another for to add the copy of the buffer when sent to multiuple destinations.

Yes, I agree. I'm still running in some exceptions, so there may be more commits added. But I could split out the queue ones, since I suspect the rest of the issues are in the threading...

hylkevds commented 1 year ago

The last problem was in the Segment implementation. The MappedByteBuffer mappedBuffer for the page is shared across Segments. But mappedBuffer.position() changes the position of this shared buffer... That means that two threads writing at the same time interfered with each other.

The quick solution is to not use buffer.put(content); for writing but:

        for (int i = startPos; i < endPos; i++)
            mappedBuffer.put(i, content.get());

I'm wondering if it isn't better to give each Segment a slice() of the mapped (page) buffer. That slice will be operating directly on the page buffer, but will not interfere. For writing, instead of calculating the position relative on the page, it would have to be calculated relative to the slice.

I'm also wondering about the efficiency of QueuePool.openNextTailSegment(String name). It seems to re-open the entire page file buffer for each segment in that page that is opened. Do you know if the back-end implementation is smart enough to not reserve memory for the entire file, for each segment that is opened it in? If we do change the Segment implementation to work on a slice, then we could just map the segment part of the file to the Sement. That may be more memory efficient. WDYT?

I'll rebuild this PR into two PRs, one for the Queue part, and one for the Session part.

hylkevds commented 1 year ago

I've chopped up this PR into the parts that logically go together:

722, #723 and #724 deal with the UsafeQueues and depend on each other.

725 and #726 are stand-alone issues that popped up during testing.

I've also moved the open discussion point back to the issue #710 and will close this PR.

andsel commented 1 year ago

Thank @hylkevds for all of this and to push forward on it. Goog catch on the interference on Memory mapper buffer, and yes I think that slicing for each segment is better, in terms of performances and cleaner in terms of code clarity. I'll go through the PRs :+1: