apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.15k stars 3.57k forks source link

[fix][broker] refactor cursor read entry process to fix dead loop read issue of txn #22944

Open TakaHiR07 opened 3 months ago

TakaHiR07 commented 3 months ago

Motivation

  1. FIx the issue https://github.com/apache/pulsar/issues/22943. This is issue is serious and actually cause txn unavailable.

This pr is similar to a previous pr https://github.com/apache/pulsar/pull/14286. Since previous pr is closed, I implement it in master branch and improve some logic and add some test.

  1. Besides, this pr would also related to another issue, which is also need to be improved. https://github.com/apache/pulsar/issues/23027

Modifications

  1. Add a field named maxReadPosition in ManagedLedgerImpl and if read op enable maxReadPosition, add check hasMoreEntriesByMaxReadPosition()
  2. Add a field named waitingCursorsByMaxReadPosition in ManagedLedgerImpl, when cursor has read op in wait state, we can put this read op in to this queue. If maxReadPosition updated, we will pool it and notify this read op.
  3. In topicTransaction buffer, when any updated maxReadposition op we should sync it to ManagedLedgerImpl maxReadPosition.

Currently :

I make many comments in the code, which maybe the concerned point.

And this pr try to retain the same process as before if disable txn. Aiming to fix the issue after enable txn.

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

Documentation

Matching PR in forked repository

PR in forked repository:

dao-jun commented 3 months ago

The implementation seems could work, but I'm not sure is it a good idea that maintain maxReadPosition in ML layer, since it is a messaging layer isolation policy which is for the purpose of providing READ_COMMITTED semantic.

TakaHiR07 commented 3 months ago

The implementation seems could work, but I'm not sure is it a good idea that maintain maxReadPosition in ML layer, since it is a messaging layer isolation policy which is for the purpose of providing READ_COMMITTED semantic.

Firstly I also try to remove maxReadPosition in ML layer. However, if the process is hasMoreEntry() check -> maxReadPosition change -> add to waitingCursor, there is a risk that waitingCursor never notify. So ML layer need to know the maxReadPosition actual value.

thetumbled commented 3 months ago

PTAL, thanks. @congbobo184 @liangyepianzhou