ecodeclub / eorm

简单 ORM 框架
Apache License 2.0
194 stars 64 forks source link

分库分表:ShardingSelector GetMulti 实现 #196

Closed flycash closed 1 year ago

flycash commented 1 year ago

仅限中文

设计

在结果集处理已经提供了不少实现之后,并且基于哈希算法的范围查询已经支持之后,我们现在可以考虑着手支持 GetMulti 了。

暂时你可以只支持最简单的范围查询,也就是在查询中不会带上以下组件的情况:

暂时通过支持 GetMulti 来初步验证 DataSource 和 Merger 合在一起的使用效果。

那么基本上代码会很简单:

qs := s.Build()
// 可以使用 errgoup 来控制并发执行
for _, q := range qs{

    eg.Do(func(){
        ds:= findDs(q.DataSource)
        rows, err := ds.Query(q)
        rowsList = append(rowsList, rows)
        return err
    })
    eg.Wait()
    mgr := batchmerger.New() // 还是 mergerbatch?,暂时直接创建一个实例,后面会考虑复用实例
    rows, err:= mgr.Merge(rowsList)
    // 复用没有分库分表的处理结果集的逻辑
}

上面是伪代码,在实现的过程中,要小心线程安全的问题。

你在测试的时候不需要测试全部范围查询的用例,因为这部分正确性已经通过 Selector.Build 方法来保证了。而后在测试的时候你要考虑以下场景,我们的目标是测试 GetMulti 本身:

Stone-afk commented 1 year ago

这里如何让 Row 也支持 valCreator *valuer.PrimitiveCreator ?

func (s *ShardingSelector[T]) GetMulti(ctx context.Context) ([]*T, error) {
    qs, err := s.Build(ctx)
    if err != nil {
        return nil, err
    }
    var rowsSlice []*sql.Rows
    var eg errgroup.Group
    for _, query := range qs {
        q := query
        eg.Go(func() error {
            s.lock.Lock()
            defer s.lock.Unlock()
            // TODO 利用 ctx 传递 DB name
            rows, err := s.db.queryContext(ctx, q)
            if err == nil {
                rowsSlice = append(rowsSlice, rows)
            }
            return err
        })
    }
    err = eg.Wait()
    if err != nil {
        return nil, err
    }

    mgr := batchmerger.NewMerger()
    rows, err := mgr.Merge(ctx, rowsSlice)
    var res []*T
    for rows.Next() {
        tp := new(T)
        if err = rows.Scan(tp); err != nil {
            return nil, err
        }
        res = append(res, tp)
    }
    return res, nil
Stone-afk commented 1 year ago

这样行不行通过 merge options 然后由 merge 传递

type Option func(meta *Merger)

type Merger struct {
    meta       *model.TableMeta
    valCreator *valuer.PrimitiveCreator
    cols       []string
}

func (m *Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, error) {
    if ctx.Err() != nil {
        return nil, ctx.Err()
    }
    if len(results) == 0 {
        return nil, errs.ErrMergerEmptyRows
    }
    for i := 0; i < len(results); i++ {
        err := m.checkColumns(results[i])
        if err != nil {
            return nil, err
        }
    }
    return &Rows{
        meta:       m.meta,
        valCreator: m.valCreator,
        rowsList:   results,
        mu:         &sync.RWMutex{},
        columns:    m.cols,
    }, nil
}

type Rows struct {
    meta       *model.TableMeta
    valCreator *valuer.PrimitiveCreator
    rowsList   []*sql.Rows
    cnt        int
    mu         *sync.RWMutex
    columns    []string
    closed     bool
    lastErr    error
}

func (r *Rows) Scan(dest ...any) error {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.lastErr != nil {
        return r.lastErr
    }
    if r.closed {
        return errs.ErrMergerRowsClosed
    }
    rows := r.rowsList[r.cnt]
    if r.valCreator != nil {
        val := r.valCreator.NewPrimitiveValue(dest, r.meta)
        if err := val.SetColumns(rows); err != nil {
            return err
        }
    }
    return rows.Scan(dest...)

}
flycash commented 1 year ago

这个应该不影响的吧?你只需要把 merger.Merge 之后返回的 rows 当成普通的 rows 来用就可以了。换句话说,原本我们在 Valuer 里面是直接操作的 sql.Rows,现在我们操作的就是 merger.Rows 了。

flycash commented 1 year ago

这样行不行通过 merge options 然后由 merge 传递

type Option func(meta *Merger)

type Merger struct {
  meta       *model.TableMeta
  valCreator *valuer.PrimitiveCreator
  cols       []string
}

func (m *Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, error) {
  if ctx.Err() != nil {
      return nil, ctx.Err()
  }
  if len(results) == 0 {
      return nil, errs.ErrMergerEmptyRows
  }
  for i := 0; i < len(results); i++ {
      err := m.checkColumns(results[i])
      if err != nil {
          return nil, err
      }
  }
  return &Rows{
      meta:       m.meta,
      valCreator: m.valCreator,
      rowsList:   results,
      mu:         &sync.RWMutex{},
      columns:    m.cols,
  }, nil
}

type Rows struct {
  meta       *model.TableMeta
  valCreator *valuer.PrimitiveCreator
  rowsList   []*sql.Rows
  cnt        int
  mu         *sync.RWMutex
  columns    []string
  closed     bool
  lastErr    error
}

func (r *Rows) Scan(dest ...any) error {
  r.mu.RLock()
  defer r.mu.RUnlock()
  if r.lastErr != nil {
      return r.lastErr
  }
  if r.closed {
      return errs.ErrMergerRowsClosed
  }
  rows := r.rowsList[r.cnt]
  if r.valCreator != nil {
      val := r.valCreator.NewPrimitiveValue(dest, r.meta)
      if err := val.SetColumns(rows); err != nil {
          return err
      }
  }
  return rows.Scan(dest...)

}

这是肯定不行的,因为merger应该是一个非常独立的东西。就是 eorm 能用,别的也能用

Stone-afk commented 1 year ago

merger.Merge 之后返回的 ows 是不能当成普通的 rows 来用,要么就是 sql.Rows 和 merger.Rows 有共同的接口

Stone-afk commented 1 year ago

@flycash 仔细回忆一下 value 的实现

type Value interface {
    // Field 访问结构体字段, name 是字段名
    Field(name string) (reflect.Value, error)
    // SetColumns 设置新值,column 是列名
    // 要注意,val 可能存在被上层复用,从而引起篡改的问题
    SetColumns(rows *sql.Rows) error
}
Stone-afk commented 1 year ago
type primitiveValue struct {
    Value
    val     any
    valType reflect.Type
}

// Field 返回字段值
func (s primitiveValue) Field(name string) (reflect.Value, error) {
    return s.Value.Field(name)
}

// SetColumns 设置列值, 支持基本类型,基于 reflect 与 unsafe Value 封装
func (s primitiveValue) SetColumns(rows *sql.Rows) error {
    switch s.valType.Elem().Kind() {
    case reflect.Struct:
        if scanner, ok := s.val.(sql.Scanner); ok {
            return rows.Scan(scanner)
        }
        return s.Value.SetColumns(rows)
    default:
        return rows.Scan(s.val)
    }
}
Stone-afk commented 1 year ago

目前就是有一个思路就是以 实现Next , Columns 等 sql.Rows 和 Rows 的共有方法为接口作为切入

flycash commented 1 year ago

199