Closed adixitconfluent closed 1 month ago
I've checked the 3 test failures. They are unrelated to the PR. I ran all of them locally and they all passed.
hi @mumrah,
I'm wondering if we can remove the fetchQueue code in this PR.
You're right, we don't need the fetch queue. I have created a JIRA https://issues.apache.org/jira/browse/KAFKA-17545 for it, and will prioritize it in the coming PRs.
Hi @junrao @mumrah , I have responded/changed my code to address your comments. Please take a look when you can, thanks!
I was still a bit confused as to why we were completing other purgatory items from within the share fetch purgatory, so @adixitconfluent and I sync'd offline. The purpose of this was to allow other delayed share fetches to check for completion in the case of new records being produced. Instead of this, I suggested that we should include a TopicPartition key in addition to the SharPartition key when creating the delayed operation. This would let us tie into the HWM listener so we could directly complete pending share fetches when the HWM increased.
This would let us avoid passing the delayed action queue and purgatory into the delayed share fetch operation. We can keep this PR scoped to adding the delayed action queue and add the produce/HWM callback in a future PR
WDYT @junrao? Does this sound reasonable?
The purpose of this was to allow other delayed share fetches to check for completion in the case of new records being produced.
For share fetch, even when there is no new data in the partition, currently there are situations that we may need to trigger a check on the delayed share fetch.
Currently, we need the action queue in delayed share fetch operation for 1. I am not sure that's truly needed. We could revisit that when we add the minBytes support.
When there is new data in the partition, we may also need to trigger a check on the delayed share fetch. This could be done by adding a TopicPartition as the key for the delayed share fetch or somehow map a TopicPartition to an existing SharePartition key.
The share partition lock is freed
I'm not sure I understand this comment. AFAIK, we only lock the share partition when acquiring records or otherwise modifying the in-flight records. I think in all these cases (acquiring new records, releasing old records, acquisition timeouts) we have the opportunity to call the purgatory to see if requests are completed.
Basically, I'd like delayed share fetch to be modeled similarly to delayed fetch where the operation only has enough context to complete itself (i.e., fetch params, ReplicaManager, and a few other things) and the calls to complete the operation all happen externally. I think we can achieve this since the completion scenarios all happen in SharePartitionManager or SharePartition.
This could be done by adding a TopicPartition as the key for the delayed share fetch
Yea, this is how I was thinking it would work. Each ShareFetchRequest would have keys for each (topic, partition) and (topic, partition, group).
Basically, I'd like delayed share fetch to be modeled similarly to delayed fetch where the operation only has enough context to complete itself (i.e., fetch params, ReplicaManager, and a few other things) and the calls to complete the operation all happen externally. I think we can achieve this since the completion scenarios all happen in SharePartitionManager or SharePartition.
Got it. You are suggesting that instead of adding a delayed action in DelayedShareFetch.onComplete, we can add it in SharePartition.releaseFetchLock. The slight difference is that in the current approach, we can wait for all partitions' lock to be released before adding the delayed action. This potentially allows the woken up delayedShareFetch to grab the lock on more partitions. If we do it inside SharePartition.releaseFetchLock, we may lose that opportunity.
To get rid of delayedAction in DelayedShareFetch, we could potentially add a method in SharePartitionManager that takes a set of partitions and does the following. We can then pass in SharePartitionManager to DelayedShareFetch so that the method can be called there?
delayedActionQueue.add(() -> {
result.keySet().forEach(topicIdPartition ->
delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));
return BoxedUnit.UNIT;
});
About
In reference to comment https://github.com/apache/kafka/pull/16969#discussion_r1750770738 , I have introduced a
DelayedActionQueue
to add purgatory actions and try to complete them.DelayedActionQueue
when partition locks are released after fetch inforceComplete
. Also, code has been added toonExpiration
to check the delayed actions queue and try to complete it. SinceonExpiration
serves as a callback forforceComplete
, it should not lead to infinite call stack.DelayedShareFetchTest
which were occurring due to insufficient mocking.Testing
The code has been tested with the help of unit tests.