zarusz / SlimMessageBus

Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
Apache License 2.0
475 stars 78 forks source link

#251 Health check circuit breaker #282

Open EtherZa opened 3 months ago

EtherZa commented 3 months ago

Addition of a health check circuit breaker for consumers (SlimMessageBus.Host.CircuitBreaker.HealthCheck).

Consumers can be optionally associated with one or more tags (and severities) in order to temporarily pause the consumer while a health check is failing.

The HealthCheckBackgroundService taps into the published health reports to distribute health status changes generated by the Microsoft.Extensions.Diagnostics.HealthChecks library. It is added as a hosted service to catch reports outside of the life time of the SlimMessageBus.

    // add health checks with tags
    builder.Services
        .AddHealthChecks()
        .AddCheck<StorageHealthCheck>("Storage", tags: ["Storage"]);
        .AddCheck<SqlServerHealthCheck>("SqlServer", tags: ["Sql"]);

    builder.Services
        .AddSlimMessageBus(mbb => {
            ...

            mbb.Consume<Message>(cfg => {
                ...

                // configure consumer to monitor tag/state
                cfg.PauseOnUnhealthyCheck("Storage");
                cfg.PauseOnDegradedHealthCheck("Sql");
            })
        })

To add the functionality, a few questionable changes have been made. Please shout if you are not comfortable with any of them/would prefer an alternate implementation.

A sample application has been included (Sample.CircuitBreaker.HealthCheck) to demonstrate the functionality with health checks that fail randomly.

Ideas for future improvements

Although I have no plans on doing this now, the functionality could be extended by incorporating an IConsumerInterceptor to watch for exceptions and, on catching one, initiate an ad hoc health check (once per exception type/period/etc). The evaluation period could then be reduced from the standard health check publishing interval to a reactive check based on an application failure backed up by the default cadence.

EtherZa commented 3 months ago

Update 2: That wasn't it either.

OutboxTests.Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHandlingRaisedAtTheEnd_Then_TransactionIsRolledBack_And_NoDataSaved_And_NoEventRaised is attempting a distributed transaction promotion.

[xUnit.net 00:01:07.39] [08:10:42 DBG] SlimMessageBus.Host.Outbox.OutboxForwardingPublishInterceptor Forwarding published message of type CustomerCreatedEvent to the outbox
[xUnit.net 00:01:07.39] [08:10:42 ERR] Microsoft.EntityFrameworkCore.Update An exception occurred in the database while saving changes for context type 'SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess.CustomerContext'.
[xUnit.net 00:01:07.39] System.PlatformNotSupportedException: This platform does not support distributed transactions.
[xUnit.net 00:01:07.39] at System.Transactions.Oletx.OletxTransactionManager.CreateTransaction(TransactionOptions options)
[xUnit.net 00:01:07.39] at System.Transactions.TransactionStatePromoted.EnterState(InternalTransaction tx)
[xUnit.net 00:01:07.39] at System.Transactions.EnlistableStates.Promote(InternalTransaction tx)
[xUnit.net 00:01:07.39] at System.Transactions.Transaction.Promote()

Update: Sql also throws this "connectivity" exception when a task is cancelled while spooling the data.

Condition added to catch SqlException and check if the token has been cancelled. If so a TaskCanceledException exception will be thrown instead.

https://learn.microsoft.com/en-us/answers/questions/207655/sqlexception-with-error-code-0-and-message-a-sever


~@zarusz It looks like sql may be experiencing an issue.~

[07:36:13 ERR] SlimMessageBus.Host.Outbox.Services.OutboxSendingTask Error while processing outbox messages
 Microsoft.Data.SqlClient.SqlException (0x80131904): A severe error occurred on the current command.  The results, if any, should be discarded.
 Operation cancelled by user.
    at Microsoft.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
    at Microsoft.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, SqlCommand command, Boolean callerHasConnectionLock, Boolean asyncClose)
    at Microsoft.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
    at Microsoft.Data.SqlClient.SqlDataReader.TryConsumeMetaData()
    at Microsoft.Data.SqlClient.SqlDataReader.get_MetaData()
    at Microsoft.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString, Boolean isInternal, Boolean forDescribeParameterEncryption, Boolean shouldCacheForAlwaysEncrypted)
    at Microsoft.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader(Boolean isInternal, Boolean forDescribeParameterEncryption)
    at Microsoft.Data.SqlClient.SqlCommand.EndExecuteReaderInternal(IAsyncResult asyncResult)
    at Microsoft.Data.SqlClient.SqlCommand.EndExecuteReaderAsync(IAsyncResult asyncResult)
    at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
 --- End of stack trace from previous location ---
    at SlimMessageBus.Host.Outbox.Sql.SqlOutboxRepository.LockAndSelect(String instanceId, Int32 batchSize, Boolean tableLock, TimeSpan lockDuration, CancellationToken token) in /_/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs:line 54
    at SlimMessageBus.Host.Outbox.Services.OutboxSendingTask.SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) in /_/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs:line 245
    at SlimMessageBus.Host.Outbox.Services.OutboxSendingTask.Run() in /_/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs:line 173
 ClientConnectionId:39c33641-24b9-4302-92bb-4b475a76b5ad
zarusz commented 3 months ago

@EtherZa excellent feature and nice proposed API. Agreed that in the future a consumer interceptor could also observe for failure rates and proactively switch off the circuit - that could be added later.

I will be able to give it some review in 1.5 weeks as I am out now without my laptop.

EtherZa commented 3 months ago

@zarusz No worries. I just hope that's a pleasure trip :)

sonarcloud[bot] commented 3 months ago

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
87.7% Coverage on New Code
0.0% Duplication on New Code

See analysis details on SonarCloud

zarusz commented 3 weeks ago

@EtherZa would it be possible to either:

  1. Rebase on release/v3, resolve few conflicts and change PR target to release/v3? I could merge as is and then refactor based on my earlier ask (shift some responsibilities).
  2. Incorporate my earlier feedback. I understand you might have other priorities, hence option 1.
EtherZa commented 3 weeks ago

@zarusz I have rebased the change on release/v3. If you have the appetite to do so, you are more than welcome to jump in and make the changes. Work committments are still a little on the heavy side at the moment.

sonarcloud[bot] commented 3 weeks ago

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
90.6% Coverage on New Code
0.0% Duplication on New Code

See analysis details on SonarCloud