Open MrAdam opened 10 months ago
Could you try upgrading to Rebus.RabbitMq 8.1.0 (if you're on Rebus 7) or 9.0.0-alpha03 (if you're using Rebus 8 pre) ?
I believe this particular issue happens to have been fixed about 1 hour ago 😅
Let me know how it goes!
Thank you for the quick response 😄
I just tried upgrading Rebus.RabbitMq
to 8.1.0
, but I still get the exception.
I am testing this locally by running two nodes of RabbitMq in a cluster configuration, through Docker. Then I have set up an endpoint for testing, which "spams" RabbitMq with messages for 1 minute.
[HttpGet]
public async Task<IActionResult> Spam()
{
var endTime = DateTime.UtcNow.AddSeconds(30);
while (DateTime.UtcNow < endTime)
{
await _bus.Send(new TestMessage());
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
return Ok();
}
After sending a request to this endpoint, I put the RabbitMq node that the test is connected to into maintenance mode:
docker exec rabbitmq-node1 rabbitmq-upgrade drain
This results in the exception mentioned previously being thrown in the test endpoint, and it doesn't recover in any way as I would have hoped:
[10.32.05 Rebus.Pipeline.Send.SendOutgoingMessageStep [Debug] Sending "TestMessage" -> "other-service"
[10.32.05 Rebus.RabbitMq.RabbitMqTransport [Warning] Closed channel detected - consumer will be disposed
[10.32.05 Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware [Error] An unhandled exception has occurred while executing the request.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text='CONNECTION_FORCED - Node was put into maintenance mode', classId=0, methodId=0
at RabbitMQ.Client.Impl.ModelBase.WaitForConfirms(TimeSpan timeout, Boolean& timedOut)
at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie(TimeSpan timeout)
at Rebus.RabbitMq.RabbitMqTransport.DoSend(IEnumerable`1 outgoingMessages, IModel model, Boolean isExpress)
at Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(IEnumerable`1 outgoingMessages, ITransactionContext context)
at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass2_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---
at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
at Rebus.Transport.TransactionContext.Complete()
at Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)
at Rebus.Bus.RebusBus.Send(Object commandMessage, IDictionary`2 optionalHeaders)
at Test.DevelopmentClientService.Controllers.DevelopmentController.Spam() in C:\Users\Adam\Projects\test\src\Test.DevelopmentClientService\Controllers\DevelopmentController.cs:line 30
at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(ActionContext actionContext, IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|20_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)
at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)
at Microsoft.AspNetCore.Authentication.AuthenticationMiddleware.Invoke(HttpContext context)
at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddlewareImpl.Invoke(HttpContext context)
[10.32.06 Rebus.Internals.ConnectionManager [Information] Existing connection found to be CLOSED
[10.32.06 Rebus.RabbitMq.RabbitMqTransport [Information] Successfully initialized consumer for "test-service"
The same thing happens with Rebus.RabbitMq
version 9.0.0-alpha03
Ok, damn 😅 Could you show me how you configure Rebus?
Sorry 😛
Here's my complete setup code for Rebus:
serviceCollection.AddRebus(
(configure, services) =>
{
var rabbitMqSettings = services.GetRequiredService<RabbitMqSettings>();
var sslSettings = new SslSettings(
rabbitMqSettings.ConnectionString.StartsWith(
"amqps://",
StringComparison.OrdinalIgnoreCase
),
new Uri(rabbitMqSettings.ConnectionString).Host,
version: SslProtocols.Tls12
);
return configure
.Logging(logging => logging.Serilog())
.Transport(
transport =>
transport
.UseRabbitMq(
rabbitMqSettings.ConnectionString,
rabbitMqSettings.InputQueueName
)
.Ssl(sslSettings)
.ClientConnectionName(
Assembly.GetExecutingAssembly().GetName().Name
)
.InputQueueOptions(
queue => queue.SetQueueTTL(rabbitMqSettings.InputQueueTtl)
)
)
.Serialization(
serialization =>
serialization.UseSystemTextJson(
new JsonSerializerOptions().ConfigureForNodaTime(
DateTimeZoneProviders.Tzdb
)
)
)
.Routing(
routing =>
routing
.TypeBased()
.Map<TestMessage>(
rabbitMqSettings.OtherServiceQueueName
)
)
.Options(options =>
{
options.SetNumberOfWorkers(1);
options.SetMaxParallelism(1);
options.HandleException<MessageCouldNotBeDispatchedToAnyHandlersException>(
(_, context) =>
{
Log.Information(
"Message type {@Type} with headers {@Headers} could not be dispatched to any handlers",
context.Message.GetMessageType(),
context.Headers
);
return Task.CompletedTask;
}
);
});
}
)
And if you'd like an easy way to run a RabbitMQ cluster locally for testing, this is the docker-compose.yml
I use:
version: "3"
name: rabbitmq-cluster
services:
# docker exec rabbitmq-node1 rabbitmq-upgrade drain
# docker exec rabbitmq-node1 rabbitmq-upgrade revive
rabbitmq-node1:
image: rabbitmq:3-management
container_name: rabbitmq-node1
hostname: rabbitmq-node1
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
- RABBITMQ_DEFAULT_VHOST=/
- RABBITMQ_ERLANG_COOKIE=123456
labels:
- "traefik.enable=true"
- "traefik.tcp.routers.rabbitmq.rule=HostSNI(`*`)"
- "traefik.tcp.routers.rabbitmq.entrypoints=rabbitmq"
- "traefik.tcp.routers.rabbitmq.service=rabbitmq"
- "traefik.tcp.services.rabbitmq.loadbalancer.server.port=5672"
- "traefik.tcp.routers.rabbitmq-management.rule=HostSNI(`*`)"
- "traefik.tcp.routers.rabbitmq-management.entrypoints=rabbitmq-management"
- "traefik.tcp.routers.rabbitmq-management.service=rabbitmq-management"
- "traefik.tcp.services.rabbitmq-management.loadbalancer.server.port=15672"
# docker exec rabbitmq-node2 rabbitmq-upgrade drain
# docker exec rabbitmq-node2 rabbitmq-upgrade revive
rabbitmq-node2:
image: rabbitmq:3-management
container_name: rabbitmq-node2
hostname: rabbitmq-node2
environment:
- RABBITMQ_ERLANG_COOKIE=123456
command: >
bash -c "
/usr/local/bin/docker-entrypoint.sh rabbitmq-server -detached
sleep 5
rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@`env hostname`.pid
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app
tail -f /var/log/rabbitmq/*.log
"
labels:
- "traefik.enable=true"
- "traefik.tcp.routers.rabbitmq.rule=HostSNI(`*`)"
- "traefik.tcp.routers.rabbitmq.entrypoints=rabbitmq"
- "traefik.tcp.routers.rabbitmq.service=rabbitmq"
- "traefik.tcp.services.rabbitmq.loadbalancer.server.port=5672"
- "traefik.tcp.routers.rabbitmq-management.rule=HostSNI(`*`)"
- "traefik.tcp.routers.rabbitmq-management.entrypoints=rabbitmq-management"
- "traefik.tcp.routers.rabbitmq-management.service=rabbitmq-management"
- "traefik.tcp.services.rabbitmq-management.loadbalancer.server.port=15672"
depends_on:
- rabbitmq-node1
load-balancer:
image: traefik:3.0
container_name: load-balancer
command:
- "--api.insecure=true"
- "--providers.docker=true"
- "--entrypoints.rabbitmq.address=:5672"
- "--entrypoints.rabbitmq-management.address=:15672"
ports:
- "15672:15672"
- "5672:5672"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- rabbitmq-node1
- rabbitmq-node2
We had a similar issue when we had our maintenance window in AWS for RabbitMQ. To be able to reconnect successfully to the cluster after one node was taken down we implemented a custom connection factory (RabbitMQ.Client.IConnectionFactory) where we used Polly to retry the connection until the connection could be successfully established again. A custom connection factory can be provided by calling CustomizeConnectionFactory on RabbitMqOptionsBuilder.
@simongullberg, the default RabbitMQ.Client.ConnectionFactory
has AutomaticRecoveryEnabled
set to true
by default, and uses a RabbitMQ.Client.Framing.Impl.AutorecoveringConnection
- I would have assumed that this would catch these kind of errors, and reestablish the connection, but it seems like this doesn't kick in if you send a message before it finds out that the connection is lost.
@mookid8000 This seems to me to be something that would be great to have integrated into Rebus.RabbitMq
, maybe @simongullberg could open source their implementation, and we could integrate it into the library?
@simongullberg seems like your solution won't work in this case. I just tried implementing a custom IConnectionFactory
and using it, but it only gets triggered when a new connection is being made. The call to Send
on the IBus
still fails, as it tries to send on a connection that is broken.
@MrAdam Rebus initialized its RabbitMQ ConnectionFactory
like this: https://github.com/rebus-org/Rebus.RabbitMq/blob/master/Rebus.RabbitMq/Internals/ConnectionManager.cs#L139-L148
What would you suggest I do to be more able to overcome connection issues?
@mookid8000 not really sure what the best approach is here.
I did some more digging, and seems like Rebus.RabbitMq.RabbitMqTransport
is calling RabbitMQ.Client.Impl.AutorecoveringModel.BasicPublish(...)
before the connection has been recovered.
Debugging into the application, I can see that the error occurs when model.BasicPublish
is called while model.IsClosed=true
.
As noted in the RabbitMQ .NET Client API Guide (https://www.rabbitmq.com/dotnet-api-guide.html#publishers):
Messages that are published using IModel.BasicPublish when connection is down will be lost. The client does not enqueue them for delivery after connection has recovered. To ensure that published messages reach RabbitMQ applications need to use Publisher Confirms and account for connection failures.
Which makes sense. But what I feel like I'm missing is some way to hook into Rebus.RabbitMq.RabbitMqTransport.DoSend(...)
to allow me to add some custom logic for retrying, if model.BasicPublish
fails.
The only way I see to solve this at the moment, is to wrap all calls to IBus
in a try/catch, one way or another, and catch (AlreadyClosedException exception) when (exception.ShutdownReason.ReplyCode == 320).
This could be through a wrapper-class which implements IBus
and replaces the original IBus
registration in the service collection, but that would require reimplementing quite a lot of methods.
I think that having a way to handle exceptions inside RabbitMqTransport
would be prefered.
Unless there is some better place in the hierarchy to place this?
Which makes sense. But what I feel like I'm missing is some way to hook into Rebus.RabbitMq.RabbitMqTransport.DoSend(...) to allow me to add some custom logic for retrying, if model.BasicPublish fails.
I don't think it should be necessary to hook into the transport's internals, its error handling should be built in. And I actually thought that Rebus' RabbitMQ transport used publisher confirms for that, but apparently something is missing (or not properly re-initialized when it has gone bad).
@MrAdam yes, you are right. This is another use case and we experienced it just a couple of days ago actually. We already have a bus wrapper implemented so my idea was just to wrap the IBus.Send() in a try/catch like you described. Turned out that this did not work for us because we use Rebus.TransactionScope package and scope.EnlistRebus() which means that the messages are sent later when we call scope.Complete().
@mookid8000 do you have any idea how we can handle transient exceptions that is thrown by the underlying RabbitMQ.Client library when using scope.EnlistRebus() and retry the send operation?
@mookid8000 do you have any ideas about this?
@simongullberg sorry for being so slow to reply 😅
I am just reading up on this issue again – if I understand it correctly, is the problem then that
await bus.Send(something);
fails with a RabbitMQ "the-model-is-closed" exception, and then if you were to wait a second and retry
await bus.Send(something);
then maybe it would succeed?
If that's the case, then my opinion is that the retry should be performed by the caller and not internally in the transport.
E.g. if your await bus.Send(something)
is in a Rebus handler, it will automatically be retried a couple of times. Or if you were in a web app, you would know best which Polly strategy would be the best to be able to await bus.Send(something)
and overcome whichever problems you anticipate.
@mookid8000, no worries :)
I think what you say makes perfect sense but since we are using a TransactionScope with EnlistRebus() It think the actual sending will happen somewhere else?
using (var scope = TransactionUtils.CreateTransactionScope())
{
scope.EnlistRebus();
await _bus.Send(something) // The actual send will not take place here.
scope.Complete();
}
Like when the .NET TransactionScope is completed the underlying RebusTransactionScope is completed and the messages are sent to RabbitMQ, so I guess the actual retrying needs to take place somewhere in here: https://github.com/rebus-org/Rebus.TransactionScopes/blob/master/Rebus.TransactionScopes/TransactionScopes/TransactionScopeExtensions.cs#L46
Yeah, in the code you posted the messages will be sent when you call scope.Complete();
If you want to retry sending with the example you posted, it would require a new transaction scope, because there's pretty high risk that the transaction has become bad from failing (e.g. by caching a failed IModel
).
I tried wrapping the RebusTransactionScope.Complete() method in a try-catch block and in the catch, sleep for a while and then call RebusTransactionScope.Complete() again. That did not work unfortunately, because the TransactionContext on the RebusTransactionScope is set to completed even though completing it the first time failed. When calling RebusTransactionScope.Complete() once again in the catch, the TransactionContext throws a "ThrowCompletedException".
@mookid8000, is it possible in some way to create a new RebusTransactionScope with a new TransactionContext and get the outgoing messages from the previous TransactionContext?
@simongullberg Yeah unfortunately, the transaction scope changes state as part of the completion attempt, and so it would not be able to guarantee that it would be able to repeat the same actions, so as you've correctly experienced yourself, completing/aborting the scope is a one-off 😅
And no, it's not readily possible. The reason is that it's slightly more involved than simply getting the outgoing messages from the transaction context, because it's possible for the transport implementation and the pipeline steps and other Rebus extensions to have hooked up callbacks that do stuff to the context's OnCommit/OnRollBack/OnAck/OnNack events.
I've run into an issue with
Rebus.RabbitMq
and Amazon MQ for RabbitMQ, where AWS automatic version upgrades causes anRabbitMQ.Client.Exceptions.AlreadyClosedException
exception that I have been unable to handle and recover from.I have a RabbitMQ cluster with three nodes, which at some point will be put into maintenance mode one by one for upgrading. However, this causes the application to throw an exception if it tries to send a message after the channel has been disposed.
There's nothing wrong with that, but it should be possible to try again, as the other nodes are still alive and well. I have tried injecting a new step in the
IPipeline
right before theSendOutgoingMessageStep
in the hope that I could catch this exception and retry sending, but theProcess
method of this new step is only reached when messages are sent successfully, and never when encountering the error.Am I missing something in regards to handling errors in the
ITransport
implementation, that would allow me to catch and retry when I get this exception?