ecodeclub / eorm

简单 ORM 框架
Apache License 2.0
194 stars 64 forks source link

分库分表:Inserter 支持分库分表 #194

Closed flycash closed 1 year ago

flycash commented 1 year ago

仅限中文

使用场景

对于插入来说,可能得场景是:

所以总结起来注意事项就是:

批量插入

如果批量插入的话,那么这一批数据里面可能要插入到不同的数据库表里面。例如说在

INSERT INTO `order`(`id`, `user_id`) VALUES((1, 2), (2, 3), (3, 4), (4, 5)) 

假如说 Order 是按照 user_id %2 来进行的,那么显然它应该拆成两批:

INSERT INTO `order_0`(`id`, `user_id`) VALUES((1, 2), (3, 4)) 
INSERT INTO `order_1`(`id`, `user_id`) VALUES((2, 3), (4, 5)) 

在不考虑事务的情况下,这两个可以直接并行执行。

因为目前事务机制还在设计之中 #189 ,所以在分多批次的情况下,就暂时直接执行,不需要考虑 ACID 的问题。等后续分布式事务已经支持了之后我们再进一步尝试允许使用不同的模式来插入,例如要求在 XA 模式下插入,以保证 ACID 的特性。

主键问题

在部分情况下,用户可能会希望我们帮他们解决主键的问题。例如说插入结构体:

type User struct {
    Id int
    Name string
}

插入的时候它只想执行 NewInserter(&User{Name: "Tom"}),而后用户预期我们会用他们提前配置好的主键生成策略,甚至于希望我们用整个 eorm 层面上的默认主键生成策略。

那么目前来说,我认为这种场景是不需要考虑的,这不是在 eorm 层面上应该考虑的问题,或者至少不是应该采用侵入式方案解决的场景。

所以也就是在执行插入的时候,我们对主键生成的问题一点都不关心。如果用户需要主键,他们应该自己解决。

这也就是我认为在设计分库分表功能的一个核心要点:主键生成不是分库分表的核心逻辑,而是属于一个扩展功能。也因此,我们后面应该考虑提供这种扩展功能的实现。不过本身不应该做成一个核心功能。

在当前这个 issue 里面我们不需要考虑这个问题。

禁用跨表批量插入

有些用户或者公司是不允许在批量插入的时候同时操作多个数据库表的。于是他们可能要求 eorm 如果发现这是一个跨表的批量插入,就直接报错,而不需要执行。

在这种情况下,我依旧认为这并不算是分库分表的核心逻辑。我们后续可以考虑通过 AOP 机制来发现和拦截这一类请求。

可行方案

按照已有的 Inserter 结构,我们可以获取所有插入的列。如果用户没有指定任何列,那么就是插入所有的列。

因为本身 Inserter 涉及到一个主键相关的调用 SkipPK,所以如果同样设置了 SkipPK 那么就应该忽略掉这个列。那么如果 PK 和 SK 是同一个列,那么如果设置了 SkipPK,就意味着没有 SK(虽然本身可能有值,但是用户要求忽略这个值),所以也应该报错。

构建 SQL

那么在构建 SQL 的整体逻辑应该遵循:

func (i *Inserter[T]) Build() (Query, error) {
    // i.quote(i.meta.TableName) 这个是原本的逻辑

      // key 是目标表,要考虑分数据源,分 db,再分表
       qs := map[string]any
       // 先对 values 按照目标表进行分组
       for index, val := range i.values {
             dst := findDst(val.SK)
            // 这里会完整构建出来
            qs[dst] = val
       }
     // 遍历 qs 里面的组,构建出来 Query
     // 返回值也要改成 []Query
}

其中找到 Dst,就调用对应的分库分表算法,等价于Request{Op: EQ, skvalues: {"sk", val.SK}}。注意这里在循环内部是一个个调用过去,而不是批量操作,因为这里我们并不能假设 Sharding 方法返回的 Dst 和我们批量传入的Request{Op: IN, skvalues: {"sk", skVals}}顺序一致。

倒也可以用 IN,那么我们在 Build 的时候就要再次分组,因为我们的目标是 INSERT INTO order_0 values((1), (2)) 而不是 INSERT INTO order_0 values(1); INSERT into order_1 values(2)

执行

执行的时候已经显得很简单了,遍历前面构造出来的 Query,而后执行就可以。 但是在返回结果的时候需要返回一个结构体,并且将它放进去 merger 里面。它会实现 sql.Result 接口

type MultiExecRes struct{
     res []sql.Result
}

func (m MultiExecRes) LastInsertId() {
    // 这个方法对于用户来说没什么意义,我们就返回最后一个
}
func (m MultiExecRes) RowsAffected() {
// 遍历,并且求和
}

那么整个 Exec 方法会收集所有的 error,并且是并行执行:

func (i Inserter) Exec () {
   qs := i.Build()
   errs := make([]error, len(qs))
   for idx, q := range qs {
      go func(idx, q Query) {
           ds := findDs(q.DataSource)
           res, err := ds.Exec(q)
           // 理论上在我们提前预先分配好了内存的情况下,这里我们不需要并发保护
          // 但是可能 -race 或者静态代码检查无法通过,那么就只能引入锁或者 channel
           qs[idx]= err
     }(idx, q)
   }
   return merger.NewMultiExecRes(resList), multierr.Combine(errs)
}

其它