Closed Queenferno closed 2 years ago
I've just written a unit test for this based on the code you've provided, and it seems to be working as per your use case, with a message processed approximately every second.
Are you using the exact same policy instance for all requests/threads to read a message from RabbitMQ?
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Threading.Tasks;
using Polly.RateLimit;
using Xunit;
using Xunit.Abstractions;
namespace Polly.Specs.RateLimit;
public class Issue939
{
private readonly ITestOutputHelper _outputHelper;
public Issue939(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}
[Fact]
public async Task Consume_Messages()
{
var policy = GetPolicy(1, _outputHelper);
var now = DateTimeOffset.UtcNow;
var sw = Stopwatch.StartNew();
_outputHelper.WriteLine($"Start: {now:u}");
_ = await policy.ExecuteAsync(() => Task.FromResult(new HttpResponseMessage()));
_outputHelper.WriteLine($"After one: {DateTimeOffset.UtcNow:u}");
_ = await policy.ExecuteAsync(() => Task.FromResult(new HttpResponseMessage()));
_outputHelper.WriteLine($"After two: {DateTimeOffset.UtcNow:u}");
_ = await policy.ExecuteAsync(() => Task.FromResult(new HttpResponseMessage()));
_outputHelper.WriteLine($"After three: {DateTimeOffset.UtcNow:u}");
sw.Stop();
Assert.True(sw.Elapsed > TimeSpan.FromSeconds(1));
}
private static IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps, ITestOutputHelper outputHelper)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result => {
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) => {
outputHelper.WriteLine($"Retrying. Num: {retryNum}");
return TimeSpan.FromSeconds(1);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
}
Polly.Specs.RateLimit.Issue939.Consume_Messages
Source: Issue939.cs line 21
Duration: 2 sec
Standard Output:
Start: 2022-05-26 12:26:52Z
After one: 2022-05-26 12:26:52Z
Retrying. Num: 1
After two: 2022-05-26 12:26:53Z
Retrying. Num: 1
After three: 2022-05-26 12:26:54Z
If I comment out the .Or<RateLimitRejectedException>()
line, then I get this output:
Polly.Specs.RateLimit.Issue939.Consume_Messages
Source: Issue939.cs line 21
Duration: 15 ms
Message:
Polly.RateLimit.RateLimitRejectedException : The operation has been rate-limited and should be retried after 00:00:00.9906806
Stack Trace:
AsyncRateLimitEngine.ImplementationAsync[TResult](IRateLimiter rateLimiter, Func`3 retryAfterFactory, Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext) line 30
AsyncPolicy.ExecuteAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext) line 228
<<ImplementationAsync>b__0>d.MoveNext() line 36
--- End of stack trace from previous location ---
AsyncRetryEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates`1 shouldRetryResultPredicates, Func`5 onRetryAsync, Int32 permittedRetryCount, IEnumerable`1 sleepDurationsEnumerable, Func`4 sleepDurationProvider, Boolean continueOnCapturedContext) line 37
AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext) line 118
AsyncPolicyWrapEngine.ImplementationAsync[TResult](Func`3 func, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext, IAsyncPolicy`1 outerPolicy, IAsyncPolicy innerPolicy) line 35
AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext) line 118
Issue939.Consume_Messages() line 34
--- End of stack trace from previous location ---
Standard Output:
Start: 2022-05-26 12:28:58Z
After one: 2022-05-26 12:28:58Z
Yes I'm using the same Policy throughout my project. And it's missbehaving. If a set mps = 10, the RateLimiter takes 5 messages per second (and not 10, as expected)
I'm at my wits end
If I change the test to parameterise the mps
value (and use 10
) and make the delay spread out by the rate limit (TimeSpan.FromSeconds(1.0 / mps)
) rather than wait an entire second, then I still get the correct behaviour with 10 messages processed in approximately 1 second.
If there is an issue here, I can't reproduce it with the code samples you've provided.
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Threading.Tasks;
using Polly.RateLimit;
using Xunit;
using Xunit.Abstractions;
namespace Polly.Specs.RateLimit;
public class Issue939
{
private readonly ITestOutputHelper _outputHelper;
public Issue939(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}
[Fact]
public async Task Consume_Messages()
{
const int Mps = 10;
var policy = GetPolicy(Mps, _outputHelper);
var now = DateTimeOffset.UtcNow;
var sw = Stopwatch.StartNew();
_outputHelper.WriteLine($"Start: {now:yyyy-MM-ddTHH\\:mm\\:ss.fffzzz}");
for (int i = 0; i < Mps; i++)
{
_ = await policy.ExecuteAsync(() => Task.FromResult(new HttpResponseMessage()));
_outputHelper.WriteLine($"After {i}: {DateTimeOffset.UtcNow:yyyy-MM-ddTHH\\:mm\\:ss.fffzzz}");
}
sw.Stop();
_outputHelper.WriteLine($"End: {now:yyyy-MM-ddTHH\\:mm\\:ss.fffzzz}");
_outputHelper.WriteLine($"Duration: {sw.Elapsed}");
}
private static IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps, ITestOutputHelper outputHelper)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result => {
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) => {
outputHelper.WriteLine($"Retrying. Num: {retryNum}");
return TimeSpan.FromSeconds(1.0 / mps);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
}
Polly.Specs.RateLimit.Issue939.Consume_Messages
Source: Issue939.cs line 21
Duration: 1 sec
Standard Output:
Start: 2022-05-26T12:47:52.711+00:00
After 0: 2022-05-26T12:47:52.720+00:00
Retrying. Num: 1
After 1: 2022-05-26T12:47:52.836+00:00
Retrying. Num: 1
After 2: 2022-05-26T12:47:52.958+00:00
Retrying. Num: 1
After 3: 2022-05-26T12:47:53.068+00:00
Retrying. Num: 1
After 4: 2022-05-26T12:47:53.177+00:00
Retrying. Num: 1
After 5: 2022-05-26T12:47:53.286+00:00
Retrying. Num: 1
After 6: 2022-05-26T12:47:53.395+00:00
Retrying. Num: 1
After 7: 2022-05-26T12:47:53.504+00:00
Retrying. Num: 1
After 8: 2022-05-26T12:47:53.613+00:00
Retrying. Num: 1
After 9: 2022-05-26T12:47:53.721+00:00
End: 2022-05-26T12:47:52.711+00:00
Duration: 00:00:01.0095679
I wrote a Console Application. My code is as follows:
using Polly;
var policy = GetPolicy(2);
for (var i = 0; i < 10000; i++)
{
await policy.ExecuteAsync(() =>
{
Console.WriteLine(DateTime.Now);
return Task.FromResult(new HttpResponseMessage());
});
}
IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result =>
{
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<Polly.RateLimit.RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) =>
{
Console.WriteLine($"Retrying. Num: {retryNum}");
return TimeSpan.FromSeconds(2);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
For some reason the Console.WriteLine()
writes every two seconds. Even though I've allowed it 10 executions per second. Something really weird is going on
The 10 executions are divided by 1 second, so it's effectively 1 request every 100 milliseconds. As you use return TimeSpan.FromSeconds(2);
when the rate limit is exceeded (which happens when more than 1 request in 100 milliseconds occurs), you're "wasting" 900 milliseconds of your usage waiting for 2 seconds until you try another request.
TimeSpan.FromSeconds(1.0 / mps)
As I mentioned in the previous reply, you need to scale your wait based on your mps
value.
If I change the code to:
return Policy .HandleResult<HttpResponseMessage>(result => { return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests; }) .Or<Polly.RateLimit.RateLimitRejectedException>() .WaitAndRetryForeverAsync((retryNum, context) => { Console.WriteLine($"Retrying. Num: {retryNum}"); return TimeSpan.FromSeconds(2); }).WrapAsync( Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
then my Console.WriteLine executes once per second (1/sec). Instead of 2/sec as I stated in my GetPolicy(2);
But when it gets rate limited, you still have TimeSpan.FromSeconds(2)
, which is limiting how fast it will retry.
I wrote it wrong. I changed it to
.WaitAndRetryForeverAsync((retryNum, context) => { Console.WriteLine($"Retrying. Num: {retryNum}"); return TimeSpan.FromSeconds(1.0 / mps ); })
and my GetPolicy(2)
writes once (1) every second. Instead of twice (2) per second
If I change your wait to be proportional to the rate limit period, it does 2 requests per second, as expected.
using Polly;
var policy = GetPolicy(2);
for (var i = 0; i < 10000; i++)
{
await policy.ExecuteAsync(() =>
{
Console.WriteLine(DateTime.Now);
return Task.FromResult(new HttpResponseMessage());
});
}
IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
var perTimeSpan = TimeSpan.FromSeconds(1);
return Policy
.HandleResult<HttpResponseMessage>(result =>
{
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<Polly.RateLimit.RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) =>
{
Console.WriteLine($"Retrying. Num: {retryNum}");
return perTimeSpan / mps;
}).WrapAsync(
Policy.RateLimitAsync(mps, perTimeSpan));
}
❯ dotnet run --configuration Release
26/05/2022 15:14:49
Retrying. Num: 1
26/05/2022 15:14:50
Retrying. Num: 1
26/05/2022 15:14:50
Retrying. Num: 1
26/05/2022 15:14:51
Retrying. Num: 1
26/05/2022 15:14:51
Retrying. Num: 1
26/05/2022 15:14:52
Retrying. Num: 1
26/05/2022 15:14:52
Retrying. Num: 1
26/05/2022 15:14:53
Retrying. Num: 1
26/05/2022 15:14:53
Retrying. Num: 1
26/05/2022 15:14:54
Retrying. Num: 1
26/05/2022 15:14:54
Retrying. Num: 1
26/05/2022 15:14:55
Retrying. Num: 1
26/05/2022 15:14:55
Retrying. Num: 1
26/05/2022 15:14:56
Retrying. Num: 1
26/05/2022 15:14:56
Retrying. Num: 1
26/05/2022 15:14:57
Retrying. Num: 1
26/05/2022 15:14:57
Retrying. Num: 1
26/05/2022 15:14:58
Retrying. Num: 1
26/05/2022 15:14:58
Retrying. Num: 1
26/05/2022 15:14:59
Retrying. Num: 1
26/05/2022 15:14:59
Retrying. Num: 1
26/05/2022 15:15:00
Retrying. Num: 1
26/05/2022 15:15:00
Retrying. Num: 1
26/05/2022 15:15:01
Retrying. Num: 1
26/05/2022 15:15:02
Retrying. Num: 1
26/05/2022 15:15:02
Retrying. Num: 1
26/05/2022 15:15:03
Retrying. Num: 1
26/05/2022 15:15:03
Retrying. Num: 1
26/05/2022 15:15:04
Retrying. Num: 1
26/05/2022 15:15:04
Retrying. Num: 1
26/05/2022 15:15:05
Retrying. Num: 1
26/05/2022 15:15:05
Retrying. Num: 1
26/05/2022 15:15:06
Retrying. Num: 1
26/05/2022 15:15:06
Retrying. Num: 1
26/05/2022 15:15:07
Retrying. Num: 1
26/05/2022 15:15:07
Retrying. Num: 1
26/05/2022 15:15:08
Retrying. Num: 1
26/05/2022 15:15:08
Retrying. Num: 1
26/05/2022 15:15:09
Retrying. Num: 1
26/05/2022 15:15:09
Retrying. Num: 1
26/05/2022 15:15:10
Retrying. Num: 1
26/05/2022 15:15:10
Retrying. Num: 1
26/05/2022 15:15:11
Retrying. Num: 1
26/05/2022 15:15:11
Retrying. Num: 1
26/05/2022 15:15:12
Retrying. Num: 1
26/05/2022 15:15:12
Retrying. Num: 1
26/05/2022 15:15:13
Retrying. Num: 1
26/05/2022 15:15:13
Retrying. Num: 1
26/05/2022 15:15:14
Retrying. Num: 1
26/05/2022 15:15:14
Retrying. Num: 1
26/05/2022 15:15:15
Retrying. Num: 1
26/05/2022 15:15:15
Retrying. Num: 1
26/05/2022 15:15:16
Retrying. Num: 1
26/05/2022 15:15:16
Retrying. Num: 1
26/05/2022 15:15:17
Retrying. Num: 1
26/05/2022 15:15:17
Retrying. Num: 1
26/05/2022 15:15:18
Retrying. Num: 1
26/05/2022 15:15:18
Retrying. Num: 1
26/05/2022 15:15:19
Retrying. Num: 1
26/05/2022 15:15:19
Retrying. Num: 1
26/05/2022 15:15:20
Retrying. Num: 1
26/05/2022 15:15:20
Retrying. Num: 1
26/05/2022 15:15:21
Retrying. Num: 1
26/05/2022 15:15:21
Retrying. Num: 1
26/05/2022 15:15:22
Retrying. Num: 1
26/05/2022 15:15:22
Retrying. Num: 1
26/05/2022 15:15:23
Retrying. Num: 1
I have a RabbitMQ Queue, filled with thousands of messages. I need my consumer to consume 1 message per second, so I have implemented a RateLimit policy using Polly. My configuration is as follows:
where mps = 1
Now what I've noticed is the following:
If I set the mps to 50, the following happens:
Is there a bug with the Policy.RateLimitAsync call? Am I doing something wrong?