ecodeclub / eorm

简单 ORM 框架
Apache License 2.0
191 stars 64 forks source link

分库分表:datasource-简单的分布式事务方案支持 #189

Closed Stone-afk closed 6 months ago

Stone-afk commented 1 year ago

datasouce-分布式事务

背景

Datasource 抽象主要是为了提供一个统一的对数据源的抽象的。目前基于分库分表的功能下实现了主从数据库、主从集群,那么必然要考虑如何协同多个数据库之间的事务(包括跨集群不同数据库之间的事务),  提供支持多个数据库的事务功能,这就涉及到了分布式事务的场景。

什么是本地事务?

本地事务就是用关系数据库来控制事务,关系数据库通常都具有ACID特性,传统的单体应用通常会将数据全部存储在一个数据库中,会借助关系数据库来完成事务控制。

什么是分布式事务?

在分布式系统中一次操作由多个系统协同完成,这种一次事务操作涉及多个系统通过网络协同完成的过程称为分布式事务。这里强调的是多个系统通过网络协同完成一个事务的过程,并不强调多个系统访问了不同的数据库,即使多个系统访问的是同一个数据库也是分布式事务,如下图:

image-20230408123150081.png

另外一种分布式事务的表现是,一个应用程序使用了多个数据源连接了不同的数据库,当一次事务需要操作多个数据源,此时也属于分布式事务,当系统作了数据库拆分后会出现此种情况。

image-20230408123226939.png

上面两种分布式事务表现形式以第一种据多。 单体库中事务比较容易做到ACID,但是分布式库中比较难做到,因为事务里面的事件是在不同的系统中,系统之间的联动是比较复杂和消耗的。我们来看看分布式库中事务的四大特性会遇到啥问题:

如何进行分布式事务控制?

CAP理论是分布式事务处理的理论基础,了解了CAP理论有助于我们研究分布式事务的处理方案。 CAP理论是:分布式系统在设计时只能在一致性(Consistency)、可用性(Availability)、分区容忍性(Partition Tolerance)中满足两种,无法兼顾三种。 image-20230408170903763.png

分布式系统只能满足三项中的两项而不可能满足全部三项。允许一个节点更新状态会导致数据不一致,即丧失了C性质。如果为了保证数据一致性,将分区一侧的节点设置为不可用,那么又丧失了A性质。除非两个节点可以互相通信,才能保证C又保证A,这个又会丧失P性质。

使用场景

行业方案

两阶段提交(2PC)

2PC是数据一致性协议,与Paxos协议,Raft协议一样,都是在分布式系统下如何保证数据的一致性而衍生出的协议,不过2PC(两阶段提交协议)主要在分布式事务中使用。它将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase)。

一个单减库存的例子: image-20230409163851185.png

  1. 应用程序连接两个数据源。
  2. 应用程序通过事务协调器向两个库发起prepare,两个数据库收到消息分别执行本地事务(记录日志),但不提交,如果执行成功则回复yes,否则回复no。
  3. 事务协调器收到回复,只要有一方回复no则分别向参与者发起回滚事务,参与者开始回滚事务。
  4. 事务协调器收到回复,全部回复yes,此时向参与者发起提交事务。如果参与者有一方提交事务失败则由事务协调器发起回滚事务。

2PC解决方案-XA

前提: 支持 XA 协议事务的数据库。 什么是**XA**协议(规范)? 在了解XA规范之前,存在着一个DTP(Distributed Transaction Processing Reference Model)模型,该模型规范了分布式事务的模型设计。 DTP 模型定义如下角色:

DTP 模型定角色的交互方式:

  1. TM 向 AP(应用程序) 提供应用程序编程接口,AP 通过 TM 提交或回滚事务。
  2. TM 交由中间件通过 XA 接口来通知 RM 数据库事务的开始、结束以及提交、回滚等。

DTP 模型定义TM和RM之间通讯的接口规范叫 XA,简单理解为数据库提供的 2PC 接口协议,基于数据库的 XA 协议来实现 2PC 又称为 XA 方案。 整个 2PC 的事务流程涉及到三个角色 AP、RM、TM。AP 指的是使用 2PC 分布式事务的应用程序;RM 指的是资源管理器,它控制着分支事务;TM 指的是事务管理器,它控制着整个全局事务。

image-20230409152655634.png

小结: 基于 XA 协议实现的分布式事务两阶段提交(2PC),对业务侵入很小,它最大的优势就是对使用透明,用户可以像使用本地事务一样使用基于 XA 协议的分布式事务,能够严格保障事务 ACID 特性。 可 2PC的缺点也是显而易见,它是一个强一致性的同步阻塞协议,事务执行过程中需要将所需资源全部锁定,也就是俗称的 刚性事务。所以它比较适用于执行时间确定的短事务,(资源锁需要等到两个阶段结束才释放)整体性能较差。 一旦事务协调者宕机或者发生网络抖动,会让参与者一直处于锁定资源的状态或者只有一部分参与者提交成功,导致数据的不一致。因此,在并发性能上的场景中,基于 XA 协议的分布式事务并不是最佳选择。 需要本地数据库支持XA协议。

事务补偿(TCC

TCC 是 Try、Confirm、Cancel 三个词语的缩写,TCC 要求每个分支事务实现三个操作:预处理 Try、确认 Confirm、撤销 Cancel

TM 首先发起所有的分支事务的 Try 操作,任何一个分支事务的Try操作执行失败,TM 将会发起所有分支事务的 Cancel 操作,若 Try 操作全部成功,TM 将会发起所有分支事务的 Confirm 操作。 下边用一个下单减库存的业务为例来说明:

image-20230409155839256.png

  1. Try
    • 下单业务由订单服务库存服务协同完成,在try阶段订单服务和库存服务完成检查和预留资源。
    • 订单服务检查当前是否满足提交订单的条件(比如:当前存在未完成订单的不允许提交新订单)。
    • 库存服务检查当前是否有充足的库存,并锁定资源。
  2. Confirm
    • 订单服务和库存服务成功完成Try后开始正式执行资源操作。
    • 订单服务向订单表写一条订单信息。
    • 库存服务减去库存。
  3. Cancel
    • 如果订单服务和库存服务有一方出现失败则全部取消操作。
    • 订单服务需要删除新增的订单信息。
    • 库存服务将减去的库存再还原

注意:TCC的 try/confirm/cancel 接口都要实现幂等性,因为在try、confirm、cancel 失败后要不断重试。 什么是幂等性? 幂等性是指同一个操作无论请求多少次,其结果都相同。 幂等操作实现方式有

三阶段提交(**3PC**)

3PC主要是解决协调者与参与者通信阻塞问题而产生的,它比2PC传递的消息还要多,性能不高。 image.png 与两阶段提交不同的是,三阶段提交有两个改动点。

也就是说,除了引入超时机制之外,3PC2PC的准备阶段再次一分为二,这样三阶段提交就有CanCommitPreCommitDoCommit三个阶段。 **CanCommit**阶段 3PCCanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。

**PreCommit**阶段 协调者根据参与者的反应情况来决定是否可以记性事务的PreCommit操作。根据响应情况,有以下两种可能。 假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务的预执行。

  1. 发送预提交请求:  协调者向参与者发送PreCommit请求,并进入Prepared阶段。
  2. 事务预提交:参与者接收到PreCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中。
  3. 响应反馈:如果参与者成功的执行了事务操作,则返回ACK响应,同时开始等待最终指令。

假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。

  1. 发送中断请求:协调者向所有参与者发送abort请求。
  2. 中断事务:参与者收到来自协调者的abort请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断。

**doCommit**阶段 该阶段进行真正的事务提交,也可以分为以下两种情况。 执行提交

中断事务 协调者没有接收到参与者发送的ACK响应(可能是接受者发送的不是ACK响应,也可能响应超时),那么就会执行中断事务。

优点(相对于**2PC**

  1. 最大的优点就是降低了参与者的阻塞范围(因为在开始事务之前确定是否能执行事务,而不是事务之中确定)
  2. 能够在出现单点故障后继续达成一致。

缺点 三阶段提交协议在去除阻塞的同时也引入了新的问题,那就是在参与者接收到preCommit消息后,如果网络出现分区,此时协调者所在的节点和参与者无法进行正常的网络通信,在这种情况下,该参与者依然会进行事务的提交,这必然出现数据的不一致性。

分布式解决方案Seata

Seata 是由阿里中间件团队发起的开源项目 Fescar,后更名为 Seata,它是一个是开源的分布式事务框架。 Seata 也是从两阶段提交演变而来的一种分布式事务解决方案,提供了 柔性事务ATTCCSAGA)事务模式 与刚性事务 XA 事务模式,为用户打造一站式的分布式解决方案,详细可参考[官方文档](https://seata.io/zh-cn/docs/overview/what-is-seata.html)**Seata**中定义了三种角色

设计

分库分表里面很少会用严格的 ACID 的分布式事务,所以 第一步 要实现的就是  try best 事务,不管成功失败但不保证 ACID,直接返回错误。这是对于分布式事务最简单的一个支持

方案一: delay Tx

顾名思义就是延时事务,当执行了第一条语义的时候才会考虑在对应的数据库上开启事务。 流程图如下:

为了方便后续扩展,在 datasource 内设计一个全局的 context 用来维护一个执行一个 分布式Tx 时所携带的上下文,后续可扩展为携带 Tx 上下文和 sql parse 的 datasource 的上下文;

type Context struct {
    TxName string
    TxCtx  context.Context
    Opts   *sql.TxOptions
}

DelayTx 维护一个 保存 []datasource.Tx 的数组, 可以是普通的 TX 结构体 ptr,也可以是 TryBestTx 结构体 ptr;

type DelayTx struct {
    lock      sync.RWMutex
    txs       map[string]datasource.Tx
    ctx       datasource.Context
    beginners map[string]datasource.TxBeginner
}

func (t *DelayTx) Query(ctx context.Context, query datasource.Query) (*sql.Rows, error) {
    // 防止 GetMulti 的查询重复创建多个事务
    t.lock.RLock()
    tx, ok := t.txs[query.DB]
    t.lock.RUnlock()
    if ok {
        return tx.Query(ctx, query)
    }
    t.lock.Lock()
    defer t.lock.Unlock()
    if tx, ok = t.txs[query.DB]; ok {
        return tx.Query(ctx, query)
    }
    db, ok := t.beginners[query.DB]
    if !ok {
        return nil, errs.ErrNotFoundTargetDB
    }
    tx, err := db.BeginTx(t.ctx.TxCtx, t.ctx.Opts)
    if err != nil {
        return nil, err
    }
    t.txs[query.DB] = tx
    return tx.Query(ctx, query)
}

func (t *DelayTx) Exec(ctx context.Context, query datasource.Query) (sql.Result, error) {
    // 防止 GetMulti 的查询重复创建多个事务
    t.lock.RLock()
    tx, ok := t.txs[query.DB]
    t.lock.RUnlock()
    if ok {
        return tx.Exec(ctx, query)
    }
    t.lock.Lock()
    defer t.lock.Unlock()
    if tx, ok = t.txs[query.DB]; ok {
        return tx.Exec(ctx, query)
    }
    db, ok := t.beginners[query.DB]
    if !ok {
        return nil, errs.ErrNotFoundTargetDB
    }
    tx, err := db.BeginTx(t.ctx.TxCtx, t.ctx.Opts)
    if err != nil {
        return nil, err
    }
    t.txs[query.DB] = tx
    return tx.Exec(ctx, query)
}

func (t *DelayTx) Commit() error {
    var err error
    for name, tx := range t.txs {
        if er := tx.Commit(); er != nil {
            err = multierr.Combine(
                err, fmt.Errorf("masterslave DB name [%s] error: %w", name, er))
        }
    }
    return err
}

func (t *DelayTx) Rollback() error {
    var err error
    for name, tx := range t.txs {
        if er := tx.Rollback(); er != nil {
            err = multierr.Combine(
                err, fmt.Errorf("masterslave DB name [%s] error: %w", name, er))
        }
    }
    return err
}

func NewDelayTx(ctx datasource.Context, beginners map[string]datasource.TxBeginner) *DelayTx {
    return &DelayTx{ctx: ctx, beginners: beginners}
}

方案二: 禁止跨库 Tx

在某些要求事物的强一致性的场景下是不允许跨库事物的,所以需要为 datasource 的提供禁止跨库的事物的支持。

type BinMultiTx struct {
    DB        string
    lock      sync.RWMutex
    tx        datasource.Tx
    ctx       datasource.Context
    beginners map[string]datasource.TxBeginner
}

func (t *BinMultiTx) Query(ctx context.Context, query datasource.Query) (*sql.Rows, error) {
    t.lock.RLock()
    if t.DB != "" && t.tx != nil {
        return t.tx.Query(ctx, query)
    }
    t.lock.RUnlock()

    t.lock.Lock()
    defer t.lock.Unlock()
    if t.DB != "" && t.tx != nil {
        return t.tx.Query(ctx, query)
    }
    db, ok := t.beginners[query.DB]
    if !ok {
        return nil, errs.ErrNotFoundTargetDB
    }
    tx, err := db.BeginTx(t.ctx.TxCtx, t.ctx.Opts)
    if err != nil {
        return nil, err
    }
    t.tx = tx
    t.DB = query.DB
    return tx.Query(ctx, query)
}

func (t *BinMultiTx) Exec(ctx context.Context, query datasource.Query) (sql.Result, error) {
    // 防止 GetMulti 的查询重复创建多个事务
    t.lock.RLock()
    if t.DB != "" && t.tx != nil {
        return t.tx.Exec(ctx, query)
    }
    t.lock.RUnlock()

    t.lock.Lock()
    defer t.lock.Unlock()
    if t.DB != "" && t.tx != nil {
        return t.tx.Exec(ctx, query)
    }
    db, ok := t.beginners[query.DB]
    if !ok {
        return nil, errs.ErrNotFoundTargetDB
    }
    tx, err := db.BeginTx(t.ctx.TxCtx, t.ctx.Opts)
    if err != nil {
        return nil, err
    }
    t.tx = tx
    t.DB = query.DB
    return tx.Exec(ctx, query)
}

func (t *BinMultiTx) Commit() error {
    if t.tx != nil {
        return t.tx.Commit()
    }
    return nil
}

func (t *BinMultiTx) Rollback() error {
    if t.tx != nil {
        return t.tx.Rollback()
    }
    return nil
}

func NewBinMultiTx(ctx datasource.Context, beginners map[string]datasource.TxBeginner) *BinMultiTx {
    return &BinMultiTx{ctx: ctx, beginners: beginners}
}
为了方便管理不同类型的 分布式 Tx,所以这里引入 TxType 常量来支持创建不同的 分布式Tx类型 以便提高后续引入 XA 方案的扩展性。
const (
    Delay    = "delay"
    BinMulti = "binMulti"
)

type TypeKey struct{}

func UsingTxType(ctx context.Context, val string) context.Context {
    return context.WithValue(ctx, TypeKey{}, val)
}

func GetCtxTypeKey(ctx context.Context) any {
    return ctx.Value(TypeKey{})
}

type TxFactory interface {
    TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error)
}

type DelayTxFactory struct{}

func (_ DelayTxFactory) TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error) {
    return NewDelayTx(ctx, b), nil
}

type BinMultiTxFactory struct{}

func (_ BinMultiTxFactory) TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error) {
    return NewBinMultiTx(ctx, b), nil
}

type TxFacade struct {
    factory   TxFactory
    ctx       datasource.Context
    beginners map[string]datasource.TxBeginner
}

func NewTxFacade(ctx context.Context, beginners map[string]datasource.TxBeginner) (TxFacade, error) {
    res := TxFacade{
        beginners: beginners,
    }
    switch GetCtxTypeKey(ctx).(string) {
    case Delay:
        res.factory = DelayTxFactory{}
        return res, nil
    case BinMulti:
        res.factory = BinMultiTxFactory{}
        return res, nil
    default:
        return TxFacade{}, errors.New("不支持的分布式事务类型")
    }
}

func (t *TxFacade) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    dsCtx := datasource.Context{
        TxCtx:  ctx,
        Opts:   opts,
        TxName: GetCtxTypeKey(ctx).(string),
    }

    return t.factory.TxOf(dsCtx, t.beginners)
}
在 clusterDB 和 shardingDatasource 中对事务的支持。
func (c *clusterDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    beginners := map[string]datasource.TxBeginner{}
    for name, db := range c.masterSlavesDBs {
        beginners[name] = db
    }
    facade, err := transaction.NewTxFacade(ctx, beginners)
    if err != nil {
        return nil, err
    }

    return facade.BeginTx(ctx, opts)
}

func (s *ShardingDataSource) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    beginners := map[string]datasource.TxBeginner{}
    for name, ds := range s.sources {
        beginners[name] = ds.(datasource.TxBeginner)
    }
    facade, err := transaction.NewTxFacade(ctx, beginners)
    if err != nil {
        return nil, err
    }

    return facade.BeginTx(ctx, opts)
}

测试

分布式事务的测试的核心是:确保在 Begin 的时候一定 Begin 了,在 Rollback 的时候都触发 Rollback 了,在 Commit 的时候都触发 Commit 了。

单元测试

集成测试

参考链接

longyue0521 commented 1 year ago

@Stone-afk 将文中所参考引用的文章链接全部列出来

# reference
- xxxx
- yyyy
flycash commented 1 year ago

DelayTx

禁止跨库事务

这个跨库事务应该就是原本的单一数据库事务的一个封装,或者说装饰器。它装饰器额外的逻辑就是在执行任何方法的时候判定查询的目标 datasource 和 db name 有没有发生变化。

在第一阶段我们可以暂时不用实现这个部分。

测试

测试用例在 DelayTx 的时候要考虑延迟开启事务。所以在设计用例的时候要执行一个语句就判定对应的数据库上的 Tx 已经开启了。在 Commit 或者 Rollback 的时候要求所有的对应的 Tx 都已经触发了动作。那么再叠加 Begin、Commit、Rollback 失败和部分失败的情况。

flycash commented 1 year ago

另外再补充一个 datasource.ShardingDB 和 clusterDB 对 DelayTx 的支持。这两个都会被影响到。

Stone-afk commented 1 year ago

我写了啊,两个都有对事务的支持,问题是如果要支持禁止跨库事务以及之后的XA,就要考虑引入 TxManager,管理不同类型的··事务,之后往 TM 和 RM 那样的关系改造

Stone-afk commented 1 year ago
func (s *ShardingDataSource) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    dsCtx := datasource.Context{
        TxCtx: ctx,
        Opts:  opts,
    }
    beginners := map[string]datasource.TxBeginner{}
    for name, ds := range s.sources {
        beginners[name] = ds.(datasource.TxBeginner)
    }
    return transaction.NewManager(dsCtx, beginners), nil
}
Stone-afk commented 1 year ago

维护 txBeginner 是为了兼容 ShardingSource 也要开启事务那么 shardiingSource 里保存的可以是任何类型的db,且他们都实现了 txBeginner 接口

Stone-afk commented 1 year ago

关于 ShardingSource 那里,还有其他更好的方案吗 @flycash

flycash commented 1 year ago

我写了啊,两个都有对事务的支持,问题是如果要支持禁止跨库事务以及之后的XA,就要考虑引入 TxManager,管理不同类型的··事务,之后往 TM 和 RM 那样的关系改造

不是,我是说你在 ShardingDB 和 clusterDB 的 Exec 和 Query 里面你会怎么处理。因为你的延迟实物是需要在执行语句的时候才知道要在哪个数据库上开启,应该是补充在 DelayTx 执行 Query 和 Exec 的时候,它怎么判断事务在目标数据源上有没有开启,要不要开启。这里就要深入讨论你是不是要做成现成安全了。

我的理解是 TxManager 在 DelayTx 是用不上的。因为 DelayTx 本身就不符合 XA 协议。如果你引入了 TxManager 的话,那么TxManager在Query和Exec的时候干啥呢?

我感觉,应该是:

func (c *clusterDB) Begin() (datasource.Tx, error) {
    // 当然,这里可能返回别的事务,要做一个分发。这里举例子就直接返回 DelayTx
    return DelayTx {
         beginers: map[string]datasource.Beginer
    }, nil
}

然后在 ShardingDB 里面也是类似的。接近于一种层层委托的概念。

所以我不知道 TxManager 在这里扮演的角色是什么。如果将来我们准备支持 XA,那么我预期用法应该是:

func (c *clusterDB) Begin() (datasource.Tx, error) {    
     return XATx {
         // XATx 在逻辑上就扮演了TxManager 的角色,只是不叫 TxManager而已。
    }, nil
}
Stone-afk commented 1 year ago

等我补个流程图先 =。=

Stone-afk commented 1 year ago

“我的理解是 TxManager 在 DelayTx 是用不上的”,主要是想做成一个工厂模式,因为用户调用 BeginTx 方法,希望用户可以指定期望的Tx对同时也方便之后扩展XA。

Stone-afk commented 1 year ago

这不太可行,因为要XA要保证强一致,TM 是用来 管理全局事务的,所以工厂模式不应该由 TxManager 来管

flycash commented 1 year ago

“我的理解是 TxManager 在 DelayTx 是用不上的”,主要是想做成一个工厂模式,因为用户调用 BeginTx 方法,希望用户可以指定期望的Tx对同时也方便之后扩展XA。

哦哦哦,那么还是叫做什么 TxFactory 比较合适。或者你也可以考虑做成门面模式,里面藏着不同的实现。

flycash commented 1 year ago

这不太可行,因为要XA要保证强一致,TM 是用来 管理全局事务的,所以工厂模式不应该由 TxManager 来管

我觉得应该是 TxFacotry -> XATx。然后 XATx 本身是 TxManager 或者 XATx 里面维护一个 TxManager,这个TxManager 就是用来管 XA 事务的。

flycash commented 1 year ago
    //这里理论上不会产出并发安全的问题
    if tx, ok := t.txs[query.DB]; ok {
        return tx.Query(ctx, query)
    }

这个地方要用读锁保护起来。或者说,如果你觉得用户不应该在多个 goroutine 里面使用,那么就可以彻底不用锁。如果用了锁的话,你就要全部都用锁保护起来。而且在这个地方,是一个典型的 check - do - something 的场景,所以实际上你需要用 double-check 的写法。

另外一个我建议在 CtxWithTypeKey 啥的,不如改名为:

UsingTxType(xxx)

那么调用起来的感觉就是:ds.Begin(datasource.UsingTxType(ctx), xxx)。其实就是觉得 CtxWithTypeKey 的可读性并不是很高。

另外就是你的图不够形象。你可以考虑就是把这几个流程都画出来:

在设计里面你要专门开一个小节,把容错相关的内容都放进去:

Stone-afk commented 1 year ago

@flycash “考虑做成门面模式” 外观模式好像不搭边吧,外观模式不是由多个子系统组合成一个大的系统对外提供接口吗,貌似设计思路不搭边,想象不出来这里如何用外观模式

flycash commented 1 year ago

你可以先弄一个合并请求出来,把主体搞出来,然后可以暂时不写测试。我看看效果。尤其是几个Datasource 上的 begin 方法

Stone-afk commented 1 year ago

等我把 外观 模式的先补上

Stone-afk commented 1 year ago
type TxFacade struct {
    f         *TxFactory
    ctx       datasource.Context
    beginners map[string]datasource.TxBeginner
}

func (t *TxFacade) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    dsCtx := datasource.Context{
        TxCtx:  ctx,
        Opts:   opts,
        TxName: transaction.GetCtxTypeKey(ctx).(string),
    }

    beginners := map[string]datasource.TxBeginner{}
    for name, db := range c.masterSlavesDBs {
        beginners[name] = db
    }
    return transaction.NewManager(dsCtx, beginners), nil
}

如果 TxFacade 实现 Beginner 那么它无法传递 beginners map[string]datasource.TxBeginner

Stone-afk commented 1 year ago

@flycash

Stone-afk commented 1 year ago
type TxFactory interface {
    TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error)
}

type DelayTxFactory struct{}

func (_ DelayTxFactory) TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error) {
    return NewDelayTx(ctx, b), nil
}

type BinMultiTxFactory struct{}

func (_ BinMultiTxFactory) TxOf(ctx datasource.Context, b map[string]datasource.TxBeginner) (datasource.Tx, error) {
    return NewBinMultiTx(ctx, b), nil
}

type TxFacade struct {
    factory   TxFactory
    ctx       datasource.Context
    beginners map[string]datasource.TxBeginner
}

func NewTxFacade(ctx context.Context, beginners map[string]datasource.TxBeginner) (TxFacade, error) {
    res := TxFacade{
        beginners: beginners,
    }
    switch GetCtxTypeKey(ctx).(string) {
    case Delay:
        res.factory = DelayTxFactory{}
        return res, nil
    case BinMulti:
        res.factory = BinMultiTxFactory{}
        return res, nil
    default:
        return TxFacade{}, errors.New("不支持的分布式事务类型")
    }
}

func (t *TxFacade) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    dsCtx := datasource.Context{
        TxCtx:  ctx,
        Opts:   opts,
        TxName: GetCtxTypeKey(ctx).(string),
    }

    return t.factory.TxOf(dsCtx, t.beginners)
}
Stone-afk commented 1 year ago
func (c *clusterDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    beginners := map[string]datasource.TxBeginner{}
    for name, db := range c.masterSlavesDBs {
        beginners[name] = db
    }
    facade, err := transaction.NewTxFacade(ctx, beginners)
    if err != nil {
        return nil, err
    }

    return facade.BeginTx(ctx, opts)
}
Stone-afk commented 1 year ago
func (s *ShardingDataSource) BeginTx(ctx context.Context, opts *sql.TxOptions) (datasource.Tx, error) {
    beginners := map[string]datasource.TxBeginner{}
    for name, ds := range s.sources {
        beginners[name] = ds.(datasource.TxBeginner)
    }
    facade, err := transaction.NewTxFacade(ctx, beginners)
    if err != nil {
        return nil, err
    }

    return facade.BeginTx(ctx, opts)
}
flycash commented 1 year ago

暂时就这样吧,琢磨来琢磨去我也暂时没想到更好的办法。