Closed 2881099 closed 3 years ago
事务传播使用过程中发现一个BUG 如下面的代码
public void FunMain(){
//代码1:
//事务处理方法
FuncA();
//代码2:
//读取、前面FuncA中添加、更新的数据
.......
}
[Transaction]
public virtual void FunA(){
//代码2.1
//查询数据,或不访问数据库
.........
//代码2.2
//添加、更新数据到数据库
FuncA_1();
}
[Transaction]
public virtual void FuncA_1(){
//代码2.2.1
//添加、更新数据到数据库
.......
}
执行完“代码1”后,“代码2”访问不到前面的事务中提交的数据,只有当方法FunMain全部结束后,数据才真正的提交到数据库。 我分析原因可能如下: 1、方法FunA和FuncA_1都标记了事务 2、首先进入FunA,UnitOfWorkManager标记启动事务, 3、“代码2.1”到“代码2.2”之间并没有对数据库执行“增、删、改”操作,所以并没有真正启动事务 4、然后“代码2.2”中,FunA中调用了FuncA_1,FuncA_1中传递了工作单元,UnitOfWorkManager标记启动事务 5、FuncA_1中对数据库执行了“增、删、改”操作,真正启动了事务 6、最后,FuncA_1执行完成提交事务,FunA执行完成提交事务
在第3步并没有真正启动事务,可能导致了Bug,执行完第6步后并没有真正把数据提交到数据库。 如果在第3步中,增加修改数据库的代码,那么第3步时就会真正启动事务,执行完第6步后,数据库里就有提交的数据。
另外请问,如果启用了主从库。 当启用事务时,会自动读取主库吗? 如果类似上面第3步的情况,会自动读取主库吗?
我确认了一下,开启事务或使用工作单元标记事务后,都是默认读取主库
有没有demo,我试试
我是直接在现有的项目中测试的,Demo一下子剥离不出来
TransactionAttribute 的 动态代理,有执行 OnBefore(Begin) OnAfter(Commit) 吗
fsql.Aop.TraceBefore fsql.Aop.TraceAfter
这两个事件监视起来
OK,我试试 fsql.Aop.TraceBefore, fsql.Aop.TraceAfter
上面的那段代码, fsql.Aop.TraceBefore:执行“代码1“时,启动了一次事务 fsql.Aop.TraceAfter:执行完”代码1“时,这个跟踪没有被触发,事务没有成功提交
经过测试,类似上面的代码,在同一个类中 在标记事务的方法内,调用另一个标记事务方法时。 跟踪结果是: fsql.Aop.TraceBefore:事务在第一次时启动了 fsql.Aop.TraceAfter:方法执行结束后,事务并没有真正提交。只有当前线程执行结束后,才会真正提交事务。
与我猜测的,第3步是否更新数据库没有关系。
其实可以换个测试方式
var uowm = new UnitOfWorkManager(fsql); var repo = new DefultRepository\<t>(fsql, uowm);
using (var uow1 = uow.Begin()) {
using (var uow2 = uow.Begin()) { repo.Insert(...); uow2.Commit(); } repo.Where(...).ToList(); //是否变化 uow1.Commit(); }
像这样整理一个 demo 传上来
@yus1977 这样测试可以吗?
@yus1977 这样测试可以吗?
OK,我找时间测试一下
经过测试,上面的代码没有问题,UnitOfWorkManager本身没有问题。 是我这边的问题,写的AOP有BUG。 我用的是Autofac,所以照着前面的TransactionalAttribute 重新写了AOP,TransactionInterceptor。 我的事务在Service对象上,当同一个对象多个标记了事务的方法嵌套时,抄过来的代码就会出问题
原因如下: 因为一个Servier实例通过AOP编织后,绑定的TransactionInterceptor是同一个实例。 如下面的代码中
IUnitOfWork _uow;
........
Task OnBefore(UnitOfWorkManager uowm)
{
_uow = uowm.Begin(this.Propagation, this.IsolationLevel);
return Task.FromResult(false);
}
同一个实例多次启用事务后,_uow会被后面启用的事务覆盖。 而第二次以后启动的事务创建的的工作单元是UnitOfWorkVirtual。 因此不能简单的使用变量来存储创建的IUnitOfWork
下面是完整的代码 AOP使用是的是 Autofac +Castle.DynamicProxy + Castle.Core.AsyncInterceptor
/// <summary>
/// 事务拦截器TranAOP, 支持同步和异步方法
/// </summary>
public class TransactionInterceptor : AsyncInterceptorBase
{
public TransactionInterceptor(UnitOfWorkManager uowManager, IConfiguration configuration)
{
_uowManager = uowManager;
this.configuration = configuration;
transactionMonitor = configuration["ContextAutoSetup:TransactionMonitor"].ToBool();
}
private readonly UnitOfWorkManager _uowManager;
private readonly IConfiguration configuration;
bool transactionMonitor = false;
/// <summary>
/// 无返回值的 异步/同步 方法拦截
/// </summary>
/// <param name="invocation"></param>
/// <param name="proceed"></param>
/// <returns></returns>
protected override async Task InterceptAsync(IInvocation invocation, Func<IInvocation, Task> proceed)
{
var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
//对当前方法的特性验证
if (methodInfo.HasAttribute<TransactionAttribute>())
{
using (var unitOfWork = BeginTransaction(methodInfo))
{
try
{
await proceed(invocation).ConfigureAwait(false);
CommitTransaction(unitOfWork);
}
catch (Exception ex)
{
RollbackTransaction(unitOfWork);
throw ex;
}
}
}
else
await proceed(invocation).ConfigureAwait(false);
}
/// <summary>
/// 有返回值的 异步/同步 方法拦截
/// </summary>
/// <param name="invocation"></param>
/// <param name="proceed"></param>
/// <returns></returns>
protected override async Task<TResult> InterceptAsync<TResult>(IInvocation invocation, Func<IInvocation, Task<TResult>> proceed)
{
var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
//对当前方法的特性验证
if (methodInfo.HasAttribute<TransactionAttribute>())
{
using (var unitOfWork = BeginTransaction(methodInfo))
{
try
{
var result = await proceed(invocation).ConfigureAwait(false);
CommitTransaction(unitOfWork);
return result;
}
catch (Exception ex)
{
RollbackTransaction(unitOfWork);
throw ex;
}
}
}
else
return await proceed(invocation).ConfigureAwait(false);
}
/// <summary>
/// 开始事务
/// </summary>
/// <param name="methodInfo"></param>
private IUnitOfWork BeginTransaction(MethodInfo methodInfo)
{
WriteConsole(0);
TransactionAttribute attribute = methodInfo.GetCustomAttribute<TransactionAttribute>();
//启动事务
return _uowManager.Begin(attribute.Propagation, attribute.IsolationLevel);
}
/// <summary>
/// //提交事务
/// </summary>
private void CommitTransaction(IUnitOfWork unitOfWork)
{
unitOfWork.Commit();
WriteConsole(1);
}
/// <summary>
/// 回滚事务
/// </summary>
private void RollbackTransaction(IUnitOfWork unitOfWork)
{
unitOfWork.Rollback();
WriteConsole(2);
}
/// <summary>
/// 控制台输出信息
/// </summary>
/// <param name="msg"></param>
/// <param name="type">0开始事务,1提交事务,2回滚事务</param>
private void WriteConsole(int type)
{
if (transactionMonitor)
{
string msg = "";
switch (type)
{
case 0:
msg = $"***Transaction Begin***\n";
break;
case 1:
msg = $"***Transactio Commit***\n";
break;
case 2:
msg = $"***Transaction* Rollback**\n";
break;
}
ConsoleHelper.WriteSuccessLine(msg);
}
}
}
}
默认 UnitOfWorkManager 只能绑定一个 FreeSql 使用,所以 Scoped 生命范围内不切换的情况下,可以实现 IdleBus + UnitOfWorkManager 以来实现。
第一步:定义 IdleBus 扩展方法
public static class IdleBusExtesions
{
static AsyncLocal<string> AsyncLocalTenantId = new AsyncLocal<string>();
public static IdleBus<IFreeSql> ChangeTenant(this IdleBus<IFreeSql> ib, string tenantId)
{
AsyncLocalTenantId.Value = tenantId;
return ib;
}
public static IFreeSql Get(this IdleBus<IFreeSql> ib) => ib.Get(AsyncLocalTenantId.Value ?? "default");
public static IBaseRepository<T> GetRepository<T>(this IdleBus<IFreeSql> ib) where T : class => ib.Get().GetRepository<T>();
}
第二步:定义 IFreeSql 代理类实现
class FreeSqlProxy : IFreeSql
{
readonly IFreeSql _orm;
public FreeSqlProxy(IFreeSql fsql)
{
_orm = fsql;
}
public IAdo Ado => _orm.Ado;
public IAop Aop => _orm.Aop;
public ICodeFirst CodeFirst => _orm.CodeFirst;
public IDbFirst DbFirst => _orm.DbFirst;
public GlobalFilter GlobalFilter => _orm.GlobalFilter;
//关键在此处,释放无任何操作
public void Dispose() { }
public void Transaction(Action handler) => _orm.Transaction(handler);
public void Transaction(IsolationLevel isolationLevel, Action handler) => _orm.Transaction(isolationLevel, handler);
public ISelect<T1> Select<T1>() where T1 : class
{
return _orm.Select<T1>();
}
public ISelect<T1> Select<T1>(object dywhere) where T1 : class => Select<T1>().WhereDynamic(dywhere);
public IDelete<T1> Delete<T1>() where T1 : class
{
return _orm.Delete<T1>();
}
public IDelete<T1> Delete<T1>(object dywhere) where T1 : class => Delete<T1>().WhereDynamic(dywhere);
public IUpdate<T1> Update<T1>() where T1 : class
{
return _orm.Update<T1>();
}
public IUpdate<T1> Update<T1>(object dywhere) where T1 : class => Update<T1>().WhereDynamic(dywhere);
public IInsert<T1> Insert<T1>() where T1 : class
{
return _orm.Insert<T1>();
}
public IInsert<T1> Insert<T1>(T1 source) where T1 : class => Insert<T1>().AppendData(source);
public IInsert<T1> Insert<T1>(T1[] source) where T1 : class => Insert<T1>().AppendData(source);
public IInsert<T1> Insert<T1>(List<T1> source) where T1 : class => Insert<T1>().AppendData(source);
public IInsert<T1> Insert<T1>(IEnumerable<T1> source) where T1 : class => Insert<T1>().AppendData(source);
public IInsertOrUpdate<T1> InsertOrUpdate<T1>() where T1 : class
{
return _orm.InsertOrUpdate<T1>();
}
}
第三步:改造 Startup.cs 注入
public static IdleBus<IFreeSql> ib = new IdleBus<IFreeSql>();
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
//ib.Register(..);
//注意此时注入 AddScoped IFreeSql
services.AddScoped<IFreeSql>(serviceProvider => new FreeSqlProxy(ib.Get()));
services.AddScoped<UnitOfWorkManager>();
services.AddFreeRepository(null, typeof(Startup).Assembly);
}
第四步:使用方法请参照 一楼
.net 6.0,按照上面的式例,在方法添加Transactional特性,但是没有生效,不是进入事务里,这个是什么原因引起吗? Service相关代码: [Transactional] private async Task SaveFlowInfoAsync(FlowInfoAddInput flowInfoInput, CancellationToken cancellationToken) { //using IUnitOfWork unitOfWork = _unitOfWorkManager.Begin(isolationLevel: IsolationLevel.Snapshot); //try //{ await this._flowDetailService.AddAsync(flowInfoInput, cancellationToken); //其他业务逻辑 。。。
// unitOfWork.Commit();
//}
//catch (Exception e)
//{
// unitOfWork.Rollback();
// LogUtils.LogErr(this, $"流程提交异常:{e.Message}。PARAMS:{flowInfoInput.FlowSubmitInput.ToJson()}", e);
// throw new CustomException(ExceptionConst.FlowSubmitFail);
//}
}
programe.cs代码: var app = builder.Build();
//使用Correlation中间件 app.UseCorrelationId(); if (app.Environment.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //使用自定义的异常中间件 app.UseErrorHandling(); app.UseStaticFiles(); app.UseLocalLogViewer(); app.Use(async (context, next) => { TransactionalAttribute.SetServiceProvider(context.RequestServices); await next(); }); //使用路由中间件 app.UseRouting() //使用终端中间件 .UseEndpoints(endpoints => { endpoints.MapGet("/", async context => { await context.Response.WriteAsync("Hello World!"); }); endpoints.MapControllers(); });
app.Run();
本篇文章内容引导,如何在 asp.net core 项目中使用特性(注解) 的方式管理事务。
支持六种传播方式(propagation),意味着跨方法的事务非常方便,并且支持同步异步:
第一步:引入动态代理库
第二步:配置 Startup.cs 注入、中间件
第三步:在 Controller 或者 Service 或者 Repository 中使用事务特性
是不是进方法就开事务呢?
不一定是真实事务,有可能是虚的,就是一个假的 unitofwork(不带事务)
也有可能是延用上一次的事务
也有可能是新开事务,具体要看传播模式
重写仓储实现
以上使用的是泛型仓储,那我们如果是重写一个仓储 如何保持和
UnitOfWorkManager
同一个事务呢。 继承现有的DefaultRepository<,>
仓储,实现自定义的仓储SongRepository.cs
,其中接口。
ISongRepository.cs
在 startup.cs 注入此服务