moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.3k stars 818 forks source link

Fix/unsafe queues #722

Closed hylkevds closed 1 year ago

hylkevds commented 1 year ago

This fixes various bugs in the unsafeQueue implementation, originally from #716 :

This PR bases on #724 and #728 This PR is followed by #723

hylkevds commented 1 year ago

Hi @hylkevds thanks for your contribution. I left some comment, mainly a doubt to use null to say "no more segments for the queue" from the openNextTailSegment in QueuePool.

I think this should be covered with tests, and I can understand the tests are present in the PR #723 and #724. The #724 is the one that parameterize the size of segments and queue to ease the tests. So I think would be better to first have such PR to be merged, then a test that exposes the problem, and finally the code the fix. The last 2 steps could be merged in one PR.

I can refactor the PRs to put the test and flexible-size pages commit at the front. The test will fail until the last commit. The error message is not as helpful as you might hope, since the test can only detect that there is corruption of the queue, but not point at the cause of the corruption. Usually because the corruption happened several cycles before it becomes evident.

andsel commented 1 year ago

I can refactor the PRs to put the test and flexible-size pages commit at the front.

Yes please, it's important to separate the PR that parameterize the segment/page size, from the one that introduces the test that fails

hylkevds commented 1 year ago

Hi @hylkevds thanks for your contribution. I left some comment, mainly a doubt to use null to say "no more segments for the queue" from the openNextTailSegment in QueuePool.

QueuePool.openNextTailSegment() now returns an Optional.

hylkevds commented 1 year ago

I can refactor the PRs to put the test and flexible-size pages commit at the front.

Yes please, it's important to separate the PR that parameterize the segment/page size, from the one that introduces the test that fails

I've refactored the set of PRs, and split out some more parts. The order is now:

Separate are now:

andsel commented 1 year ago

Hi @hylkevds, please split this PR at the commit 8341efc

The other 3 commits: c9328c8 e888fa4 and 73d6488 should go in another PR. In this way the reviewing process becomes easier for me :pray:

andsel commented 1 year ago

So this PR could be considered as closed?

hylkevds commented 1 year ago

So this PR could be considered as closed?

Depends if you want to apply all these commits in one go, or as separate merges. The PRs depend on each other, so the next one always also contains the commits of the one before. If you merge #723 you also merge the commits of #724, #728 and #722 But you can first review #724, then #728, then #722 and finally #723 to keep them separate

andsel commented 1 year ago

@hylkevds #724 and #722 has been merged, please rebase on main

hylkevds commented 1 year ago

Hmm, let me go over that possible off-by-one again. Old code:

if (tailSegment.hasSpace(existingTail, fullMessageSize + 1))

Expands through bytesAfter(mark) >= length using mark => existingTail and length => fullMessageSize + 1 to:

tailSegment.bytesAfter(existingTail) >= fullMessageSize + 1

But we now have

tailSegment.bytesAfter(existingTail) + 1

Hmm, indeed, there seems to be a +1 too many... But why does it not break the Tests. They actually validate the message content. Removing either of the two +1's breaks the test (yay for tests!) so apparently it's correct... but I can't remember why I changed it! I know I've been stepping through this code in debug mode, with specific message sizes, to validate what happens. Maybe i changed it and forgot to list it as a bug that was fixed.

I'll have to have a closer look at it again...

hylkevds commented 1 year ago

Debugging the potential off-by-one problem. Using a segment size of 1000, and an in-queue message size of 500. Reading the first message results in a remainingInSegment of 1000 with the +1. tailSegment.bytesAfter(existingTail) returns 999. So the first +1 is correct.

The second one then also becomes clear. The block in that if should only run if it is a simple read without any side effects. In practice that means if there is at least one byte left after we read the full message. If the segment is exactly emptied by the read, we already have a special case, and have to use the second block.

I propose we change the if to just a > without the +1 instead of a >= with a +1. And change the comment for that block to currentSegments contains fully the payload _with space left over_ That should be clearer.