BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
260 stars 39 forks source link

Concurrency exception in DbDistributedLockManager #173

Open davimorao opened 2 years ago

davimorao commented 2 years ago

Hello, I'm testing with the latest version of SilverBack 3.8.0 and I'm getting the DbUpdateConcurrencyException error from the Microsoft.EntityFrameworkCore package

Stack trace

Silverback.Background.DbDistributedLockManager[27] Failed to send heartbeat for lock OutboxWorker (2f3cf9ce029d45a3ace0e49016cc87d2). Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions. at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ThrowAggregateUpdateConcurrencyException(Int32 commandIndex, Int32 expectedRowsAffected, Int32 rowsAffected) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeResultSetWithPropagationAsync(Int32 commandIndex, RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeAsync(RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList1 entriesToSave, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(StateManager stateManager, Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.SqlServer.Storage.Internal.SqlServerExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func4 operation, Func4 verifySucceeded, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(String resourceName, String uniqueId, IServiceProvider serviceProvider) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(DistributedLockSettings settings)

Package versions:

.Net6 Silverback.Core Version="3.8.0" Silverback.Core.EntityFrameworkCore Version="3.0.1" Silverback.Core.Model Version="3.8.0" Silverback.Integration.Kafka Version="3.8.0" Microsoft.EntityFrameworkCore.SqlServer Version="6.0.10" Microsoft.EntityFrameworkCore.Tools Version="6.0.10"

Here's how the code is:

Startup public static IServiceCollection AddSilverbackServices(this IServiceCollection services) { services.AddSilverback() .UseDbContext()

                // Setup the lock manager using the database
                // to handle the distributed locks.
                // If this line is omitted the OutboundWorker will still
                // work without locking. 
                .AddDbDistributedLockManager()

                .WithConnectionToMessageBroker(options =>
                    options.AddKafka()
                           .AddOutboxDatabaseTable()
                           .AddOutboxWorker())
                .AddEndpointsConfigurator<InboundEndpointsConfigurator>()
                .AddEndpointsConfigurator<OutboundEndpointsConfigurator>()
                .AddSingletonSubscriber<OrderSubscriber>();

        return services;
    }

InboundEndpointsConfigurator builder .AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddInbound(endpoint => endpoint.ConsumeFrom("order-events") .Configure(config => { config.GroupId = "order-consumer"; }) .OnError(policy => policy.Retry(3, TimeSpan.FromSeconds(1)))));

OutboundEndpointsConfigurator builder.AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddOutbound<OrderCommand>(endpoint => endpoint.ProduceTo("order-events") .ProduceToOutbox()));

OrderSubscriber `public class OrderSubscriber { private readonly ILogger _logger; public OrderSubscriber(ILogger logger) { _logger = logger; }

    public void OnMessageReceived(OrderCommand message)
    {
        _logger.LogInformation($"Received Id: {message.Id}, CreatedAt: {message.CreatedAt:dd/MM/yyyy - HH:mm:ss}");
    }

}`

DbContext `public class KafkaManagementDbContext : DbContext { public KafkaManagementDbContext(DbContextOptions options) : base(options) { this.Database.EnsureCreated(); }

    public DbSet<OutboxMessage> Outbox { get; set; } = null!;
    public DbSet<InboundLogEntry> InboundMessages { get; set; } = null!;
    public DbSet<StoredOffset> StoredOffsets { get; set; } = null!;
    public DbSet<Lock> Locks { get; set; } = null!;

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<InboundLogEntry>()
                    .HasKey(t => new { t.MessageId, t.ConsumerGroupName });
    }
}`

OrderCommand public class OrderCommand : IIntegrationCommand { public Guid Id { get; set; } public DateTime CreatedAt { get; set; } }

Publish await _publisher.PublishAsync(new OrderCommand { Id = Guid.NewGuid(), CreatedAt = DateTime.Now }); await _dbContext.SaveChangesAsync();

BEagle1984 commented 2 years ago

This is unfortunately a known issue (but thank you very much for writing the issue, so it stays here as documentation for other users).

It doesn't affect the functionality, since the heartbeat is updated frequently and failing to do that every once in a while will not cause any issue.

To be honest, I'm not very happy with the current design of the entire database integration and the implementation of the distributed lock. I'm reimplementing those parts from scratch in the upcoming major version, to target the specific database engines and take advantage of their individual features. This is the reason why I didn't invest into trying to further improve the current implementation.

In my opinion those errors can just be ignored. Or did you observe any related issue (beside the ugly error in the log)?

alefcarlos commented 1 year ago

@BEagle1984, good ?

I tried to supress this LogEvent, but now it logs as FATAL:

            silverbackBuilder
                .WithLogLevels(configurator => configurator
                    .SetLogLevel(CoreLogEvents.FailedToSendDistributedLockHeartbeat.EventId, LogLevel.None));
[11:18:05 FTL] Failed to send heartbeat for lock cd-ms-sample-api (0d5ff1c6202f4cbfa6f0d45dcd658116).
Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.

Without the SetLogLevel it logs as ERROR.

[11:19:58 ERR] Failed to send heartbeat for lock cd-ms-sample-api (c075cf801ae3456288f8a01337038cc8).
Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.
BEagle1984 commented 1 year ago

I'm not sure about the behavior of the None level. Have you tried with an actual level like Information or Debug?

alefcarlos commented 1 year ago

Debug/Information is working as expected.