Some of the tests assume that we'd fetch all produced records in one go. The Kafka protocol doesn't guarantee that and sometimes kfake also returns less than that. This means that sometimes the tests would finish early before we've fetched all records.
This PR introduced logPollFetches which loops over fetchers.PollFetches multiple times until a timeout or until we fetch the expected number of records.
I also changed all places which had this assumption and all places which already did some form of log polling.
2s timeout
I set the timeout to 2s because of the MinBytesMaxWait of 1s in tests (code):
fetcher receives 1 record (1st request)
fetcher sends the fetchResult to fetchWant.result
fetcher receives 2 records (2nd request)
fetcher can't send the result to fetchWant.result because start() isn't receiving on the channel; start() is trying to send to orderedFetches
fetcher start another (3rd) attempt; the attempt takes 1s (MaxWaitMillis in the Fetch request)
client calls PollFetches, reads from orderedFetches; receives the first Fetches with 1 records; start() goes back to waiting on fetchWant.result
client calls PollFetches, start() is still waiting on fetchWant.result
fetcher receives the response to 3rd request with no records; sends the buffered 2 records from 2nd request
client receives 2 records
Another solution to waiting for 2s is to make the merging of results in start() for example.
What this PR does
Some of the tests assume that we'd fetch all produced records in one go. The Kafka protocol doesn't guarantee that and sometimes kfake also returns less than that. This means that sometimes the tests would finish early before we've fetched all records.
This PR introduced
logPollFetches
which loops overfetchers.PollFetches
multiple times until a timeout or until we fetch the expected number of records.I also changed all places which had this assumption and all places which already did some form of log polling.
2s timeout
I set the timeout to 2s because of the MinBytesMaxWait of 1s in tests (code):
fetchResult
tofetchWant.result
fetchWant.result
becausestart()
isn't receiving on the channel;start()
is trying to send toorderedFetches
MaxWaitMillis
in the Fetch request)PollFetches
, reads from orderedFetches; receives the firstFetches
with 1 records;start()
goes back to waiting onfetchWant.result
PollFetches
,start()
is still waiting onfetchWant.result
Another solution to waiting for 2s is to make the merging of results in
start()
for example.Which issue(s) this PR fixes or relates to
Fixes https://github.com/grafana/mimir/issues/9970
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.