Closed Rzpeg closed 5 years ago
It's like renewalTask.Dispose() is never called to cancel the renewal task on completion, and when its effectively run (70% of lease duration), the message doesn't exist anymore
From an inspection of the code, I found a "hole" where the peek lock renewal task would not be disposed, if the message handler threw an exception.
Could it be that this issue was observed for cases where the message handler threw?
In any case, this particular issue was fixed in Rebus.AzureServiceBus 7.0.0-a03
That's strange, because no exceptions were thrown from handlers. Could it be because AutoComplete is true by default on receiver now? We no longer have the long-running handlers, so we removed the AutomaticallyRenewPeekLock() option, and the errors stopped. Also, we cannot update to v7 because of the breaking change related to naming.
Just a note: Since Rebus.AzureServiceBus 7.0.0-a04 it's possible to go .UseLegacyNaming()
on the ASB configuration extensions, e.g. like this:
Configure.With(...)
.Transport(t => t.UseAzureServiceBus(conn, "queue-name").UseLegacyNaming())
.Start();
which will adjust the naming strategy to work as it did in v6 and previous versions π
There's something weird happening to the RenewPeekLock task termination process, that might be causing the original issue. I have made a simple console app with rebus, which publishes/consumes messages in multiple threads, without exceptions in handlers, and, sometimes I get the following warnings: Rebus.Threading.TaskParallelLibrary.TplAsyncTask [Warning] Periodic task "RenewPeekLock-9296026a-e79b-42a5-8941-7ca29b972c15" did not finish within 5 second timeout!. So, it's possible that when/if it's finishes the lock supplied is indeed invalid.
Installed packages:
<ItemGroup>
<PackageReference Include="AutoFixture">
<Version>4.6.0</Version>
</PackageReference>
<PackageReference Include="Rebus">
<Version>5.1.0</Version>
</PackageReference>
<PackageReference Include="Rebus.Autofac">
<Version>5.2.0</Version>
</PackageReference>
<PackageReference Include="Rebus.AzureServiceBus">
<Version>6.0.4</Version>
</PackageReference>
<PackageReference Include="Rebus.Jil">
<Version>4.0.0</Version>
</PackageReference>
<PackageReference Include="Rebus.Serilog">
<Version>5.0.0</Version>
</PackageReference>
<PackageReference Include="Serilog.Enrichers.Environment">
<Version>2.1.2</Version>
</PackageReference>
<PackageReference Include="Serilog.Exceptions">
<Version>4.1.0</Version>
</PackageReference>
<PackageReference Include="Serilog.Sinks.Console">
<Version>3.1.1</Version>
</PackageReference>
</ItemGroup>
Here's the code itself:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using AutoFixture;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Handlers;
using Rebus.Retry.Simple;
using Rebus.Routing.TypeBased;
using Serilog;
using Serilog.Exceptions;
namespace RebusAzureTest
{
class Program
{
static async Task Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.Enrich.FromLogContext()
.Enrich.WithExceptionDetails()
.Enrich.WithMachineName()
.Enrich.WithEnvironmentUserName()
.WriteTo.Console(outputTemplate: "{Timestamp:HH:mm:ss} {SourceContext} [{Level}] {Message}{NewLine}{Exception}")
.CreateLogger();
var containerBuilder = new ContainerBuilder();
containerBuilder
.RegisterType<DoStuffHandler>()
.As<IHandleMessages<DoStuffCommand>>();
containerBuilder
.RegisterType<StuffDoneHandler>()
.As<IHandleMessages<StuffDoneEvent>>();
containerBuilder
.RegisterRebus(config => config
.Logging(l => l.Serilog())
.Serialization(s => s.UseJil(Jil.Options.ISO8601IncludeInherited))
.Transport(t => t.UseAzureServiceBus(
"",
"tester.input")
.AutomaticallyRenewPeekLock())
.Options(o => o.SimpleRetryStrategy("tester.error"))
.Options(o => o.SetNumberOfWorkers(5))
.Options(o => o.SetMaxParallelism(10))
.Options(o => o.SetWorkerShutdownTimeout(TimeSpan.FromMinutes(4)))
.Routing(r => r.TypeBased()
.Map<DoStuffCommand>("tester.input"))
);
var container = containerBuilder.Build();
var bus = container.Resolve<IBus>();
await bus.Subscribe<StuffDoneEvent>()
.ConfigureAwait(false);
var fixture = new Fixture();
var commands = fixture.CreateMany<DoStuffCommand>(500);
var events = fixture.CreateMany<StuffDoneEvent>(500);
var commandsTasks = commands.Select(command => bus.Send(command));
var eventsTasks = events.Select(@event => bus.Publish(@event));
Task.Run(() => Task.WhenAll(commandsTasks.Union(eventsTasks)));
var mre = new ManualResetEvent(false);
mre.WaitOne();
}
}
public class DoStuffCommand
{
public string Stuff { get; set; }
}
public class StuffDoneEvent
{
public string Stuff { get; set; }
}
public class DoStuffHandler : IHandleMessages<DoStuffCommand>
{
private readonly ILogger logger = Log.Logger.ForContext<DoStuffHandler>();
public async Task Handle(DoStuffCommand message)
{
this.logger.Information("{@message}", message);
await Task.Delay(1)
.ConfigureAwait(false);
}
}
public class StuffDoneHandler : IHandleMessages<StuffDoneEvent>
{
private readonly ILogger logger = Log.Logger.ForContext<StuffDoneHandler>();
public async Task Handle(StuffDoneEvent message)
{
this.logger.Information("{@message}", message);
await Task.Delay(1)
.ConfigureAwait(false);
}
}
}
thanks for providing code β I'll take a look at it
You might need to run the app a few times, out of 10 runs, twice I had no warnings, the rest averaged at 3-6 per run
Thanks so much for your short and simple reproduction β it made it easy for me to spot the bug, which turned out to be a bug in Rebus' way of handling task cancellation.
The reason was in TplAsyncTask
where a CancellationToken
was passed to the Task
running the whole thing, which would (apparently!!) sometimes lead to the containing task being cancelled, thus not giving it a change to call .Set()
on the reset event passed to it.
This little thing made all the difference.
So, to never get the warning again, you update Rebus to 5.2.1.
Thanks again for your help with solving this issue! π
Thanks for solving the issue! π
Hello, After updating the transport from v5 to v6, that changed the driver from WindowsAzure.ServiceBus to Microsoft.Azure.ServiceBus, we started to get thousands of exceptions of type "Microsoft.Azure.ServiceBus.MessageLockLostException": "The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue", originating from "Rebus.AzureServiceBus.AzureServiceBusTransport", with the stack trace:
We have .AutomaticallyRenewPeekLock() enabled, the queues were created by Rebus with the previous driver, the lock duration is 5 minutes.
Our config is (over Autofac):
Any hint?