There is a bug in the consumer class that makes it impossible to set partition offset if the consumer is under load.
foreach (var message in response.Messages)
{
_fetchResponseQueue.Add(message, _disposeToken.Token); <-- Will block if queue is full
if (_disposeToken.IsCancellationRequested) return;
}
var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); <-- Will overwrite any offset set by client while blocking above
I have changed the offset collection to include a boolean that indicates if the offset was set from outside the worker task.
There is a bug in the consumer class that makes it impossible to set partition offset if the consumer is under load.
I have changed the offset collection to include a boolean that indicates if the offset was set from outside the worker task.