dotnetcore / sharding-core

high performance lightweight solution for efcore sharding table and sharding database support read-write-separation .一款ef-core下高性能、轻量级针对分表分库读写分离的解决方案,具有零依赖、零学习成本、零业务代码入侵
https://xuejmnet.github.io/sharding-core-doc/
Apache License 2.0
1.17k stars 171 forks source link

在上下文 中 封装事务 报错 #273

Open xiantaibai opened 3 months ago

xiantaibai commented 3 months ago

执行 dbcontext.SaveEntitiesAsync(); 错误类型 System.InvalidOperationException: The specified transaction is not associated with the current connection. Only transactions associated with the current connection may be used. at Microsoft.EntityFrameworkCore.Storage.RelationalTransaction..ctor(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger1 logger, Boolean transactionOwned, ISqlGenerationHelper sqlGenerationHelper) at Microsoft.EntityFrameworkCore.Storage.RelationalTransactionFactory.Create(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger1 logger, Boolean transactionOwned) at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.CreateRelationalTransaction(DbTransaction transaction, Guid transactionId, Boolean transactionOwned) at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.UseTransaction(DbTransaction transaction, Guid transactionId) at Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.UseTransaction(DatabaseFacade databaseFacade, DbTransaction transaction, Guid transactionId) at Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.UseTransaction(DatabaseFacade databaseFacade, DbTransaction transaction) at ShardingCore.Sharding.ShardingDbContextExecutors.DataSourceDbContext.JoinCurrentTransaction() at ShardingCore.Sharding.ShardingDbContextExecutors.DataSourceDbContext.NotifyTransaction() at ShardingCore.Sharding.ShardingDbContextExecutors.ShardingDbContextExecutor.NotifyShardingTransaction() at ShardingCore.EFCores.ShardingRelationalTransactionManager.BeginTransaction(IsolationLevel isolationLevel) at ShardingCore.EFCores.ShardingRelationalTransactionManager.BeginTransaction() at Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade.BeginTransaction()

数据库上下文

public class IotDbContext : AbstractShardingDbContext, IShardingTableDbContext
{
    public DbSet<DeviceInfo> DeviceInfo { get; set; }
    public DbSet<Message> Message { get; set; }

    public IotDbContext(DbContextOptions options, IMediator mediator, IServiceProvider provider) : base(options)
    {

    }
    //public IotDbContext(DbContextOptions options) : base(options)
    //{
    //}

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        if (modelBuilder is null)
        {
            throw new ArgumentNullException(nameof(modelBuilder));
        }

        // modelBuilder.ApplyConfiguration(new DeviceInfoEntityTypeConfiguration());
        modelBuilder.ApplyConfigurationsFromAssembly(typeof(DeviceInfoEntityTypeConfiguration).Assembly);
    }

    public IRouteTail RouteTail { get; set; }

    public IDbContextTransaction? CurrentTransaction { get; private set; }

    public IDbContextTransaction BeginTransaction()
    {
        try
        {

            if (_publisherTransactionFactory != null)
            {
                CurrentTransaction = _publisherTransactionFactory.BeginTransaction(this);
            }
            else
            {
                CurrentTransaction = Database.BeginTransaction();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        return CurrentTransaction;
    }

    public async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.CommitAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public async Task RollbackAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.RollbackAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction == null)
        {
            CurrentTransaction = this.BeginTransaction();
            await using (CurrentTransaction)
            {
                try
                {
                    await base.SaveChangesAsync(cancellationToken);
                    await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
                    await CommitAsync(cancellationToken);
                    return true;
                }
                catch
                {
                    await RollbackAsync(cancellationToken);
                    throw;
                }
            }
        }
        else
        {
            await base.SaveChangesAsync(cancellationToken);
            await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
            return true;
        }
    }
}
xuejmnet commented 3 months ago

不太清楚这段代码的意义

_publisherTransactionFactory.BeginTransaction(this);
xuejmnet commented 3 months ago

直接使用

using(var tran=dbcontex.Database.beginTransaction()){

tran.commit();
}
xiantaibai commented 3 months ago

_publisherTransactionFactory.BeginTransaction(this); 这个是用外部事务。 我重新封装了一个UnitOfWork,在仓储里面使用 事务。当执行这段代码时候 _context.Database.BeginTransaction(); 就会出现上面的错误

public class IotUnitOfWork : ITransactionUnitOfWork
{
    private readonly IotDbContext _context;

    public IotUnitOfWork(IotDbContext context)
    {
        _context = context;
    }

    public IDbContextTransaction? CurrentTransaction { get; private set; }

    public IDbContextTransaction BeginTransaction()
    {
        CurrentTransaction = _context.Database.BeginTransaction();
        return CurrentTransaction;
    }

    public async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.CommitAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public void Dispose()
    {
        _context.Dispose();
    }

    public async Task RollbackAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.RollbackAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
    {
       return _context.SaveChangesAsync(cancellationToken);
    }

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction == null)
        {
            CurrentTransaction = this.BeginTransaction();
            await using (CurrentTransaction)
            {
                try
                {
                    await SaveChangesAsync(cancellationToken);
                    // await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
                    await CommitAsync(cancellationToken);
                    return true;
                }
                catch
                {
                    await RollbackAsync(cancellationToken);
                    throw;
                }
            }
        }
        else
        {
            await SaveChangesAsync(cancellationToken);
            // await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
            return true;
        }
    }
}

开启事务

msgRepository.Add(message);

var result = await msgRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
xuejmnet commented 3 months ago

@xiantaibai 先尝试一下普通的事务开启是否可以

xiantaibai commented 3 months ago

我使用原生 注入 ,是没问题的。改成 AddShardingDbContext 就会出现上面的问题

builder.Services.AddDbContext<IotDbContext>(options =>
{
    options.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
    {
        builder.UseRelationalNulls();
    });
});
xuejmnet commented 3 months ago

我说的是addShardingDbcontext加普通模式开启事务

using(var tran=dbcontex.Database.beginTransaction()){

tran.commit();
}
xiantaibai commented 3 months ago

addShardingDbcontext加普通模式开启事务 报同样的错误,执行到 iotDbContext.Add(message); 报错

using (var transaction = await iotDbContext.Database.BeginTransactionAsync())
{
    try
    {
        iotDbContext.Add(message);
        var result = await iotDbContext.SaveChangesAsync(cancellationToken) > 0;
        await transaction.CommitAsync();
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
        await transaction.RollbackAsync();
    }
}
xiantaibai commented 3 months ago

这是启动配置

builder.Services.AddShardingDbContext<IotDbContext>()
//builder.Services.AddShardingConfigure<IotDbContext>()
 .UseRouteConfig(op =>
{
    op.AddShardingTableRoute<MessageVirtualTableRoute>();
})
.UseConfig(options =>
{
    //当无法获取路由时会返回默认值而不是报错
    options.ThrowIfQueryRouteNotMatch = false;

    options.UseShardingQuery((conStr, builder) =>
    {
        builder.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
        {
            builder.UseRelationalNulls();
        }).UseLoggerFactory(efLogger);
    });

    options.UseShardingTransaction((conStr, builder) =>
    {
        builder.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
        {
            builder.UseRelationalNulls();
        }).UseLoggerFactory(efLogger);
    });
    options.UseShardingMigrationConfigure(b =>
    {
        b.ReplaceService<IMigrationsSqlGenerator, ShardingMySqlMigrationsSqlGenerator>();
    });
    options.AddDefaultDataSource("ds0", constr);
})
 .ReplaceService<IDbContextCreator, AppDbContextCreator>(ServiceLifetime.Singleton)
.AddShardingCore();
builder.Services.Replace(ServiceDescriptor.Singleton<IDbContextCreator, AppDbContextCreator>());

builder.Services.AddScoped<IDeviceInfoRepository, DeviceInfoRepository>();
builder.Services.AddScoped<IMessageRepository, MessageRepositoryy>();
xuejmnet commented 3 months ago

@xiantaibai the current connection may be used.这句话的意思是你并发了吧这个链接在a线程被使用了b线程不能被用了,所以看是否有并发的用法导致的比如Task.Run内