kevinyan815 / gocookbook

go cook book
MIT License
789 stars 167 forks source link

使用WaitGroup, Channel和Context打造一个并发用户标签查询器 #21

Open kevinyan815 opened 4 years ago

kevinyan815 commented 4 years ago

这个例子是使用WaitGroup编排多个goroutine并发完成用户多个标签数据的查询工作。至于每个标签应该怎么查询则由具体的标签查询实现类型实现Picker接口的PickTagValueForUser方法,在方法里实现自己标签的查询逻辑。

下面的Picker接口约定了每个标签查询器必须要实现的两个方法

type Picker interface {
    // 用于查询用户的标签值
    PickTagValueForUser (userId int64, args ... interface{})
    // 通知查询到的标签值
    Notify () <-chan interface{}
}

resolveTagPicker函数用于通过标签名解析出每个对应标签查询器对象。

// 根据标签名解析出对应的TagPicker
func resolveTagPicker (tagName string) Picker {
    switch tagName {
    case TAG_ORDER_NUM: // 这是个常量 值是字符串order_num 
        return &OrderNumTagPicker{
            TagName: tagName,
            ValueCh: make(chan interface{}),
        }
    default:
        return nil
    }
}

下面是对外提供的对多个用户标签进行并发查询的查询方法,BulkQueryUserTagValue会根据要查询的多个用户标签每个都开启一个goroutine 执行queryTagValue方法。 queryTagValue方法就是每个标签查询器执行的goroutine,它会根据标签标识实例化出相应的标签查询器,然后再开启一个goroutine执行标签查询器实现的PickTagValueForUser方法,查询到后会通过标签查询器的Channel获得标签值。这里再开一个goroutine去执行PickTagValueForUser方法的原因是要做好查询器的超时处理。 queryTagValue会同时接收标签查询器查询到的结果值(通过Notify方法返回的Channel)和ctx.Done() 这个Channel。 如果在ctx.Done()通道接收到值时还没有从查询器的Channel接收到标签值,则视为超时。

// 对外提供的批量查询用户标签值的方法
func BulkQueryUserTagValue(tagNames []string, userId int64, queryArgs ...interface{}) (tagValuePairs []*TagValuePair) {
    tagCount := len(tagNames)
    if tagCount < 1 {
        return
    }
    wg := &sync.WaitGroup{}
    wg.Add(tagCount)
    tagValueCh := make(chan *TagValuePair, tagCount) // 用于接收所有Picker查到的标签值的Channel
    ctx, _ := context.WithTimeout(context.Background(), time.Minute) // 设置执行标签值查找的超时时间
    for _, tagName := range tagNames {
        go queryTagValue(ctx, wg, tagName, userId, tagValueCh, queryArgs...)
    }
    wg.Wait()
    close(tagValueCh) // 先关闭通道 方便下面for range不发生阻塞, 从channel中读完值即退出
    tagValuePairs = make([]*TagValuePair, 0)
    for tagValue := range tagValueCh {
        if tagValue.Value != nil {
            tagValuePairs = append(tagValuePairs, tagValue)
        }
    }

    return tagValuePairs
}

type TagValuePair struct {
    Name string `json:"tag_name"`
    Value interface{} `json:"tag_value"`
}

func queryTagValue(ctx context.Context, wg *sync.WaitGroup, tagName string, userId int64, tagValueCh chan *TagValuePair, queryArgs ...interface{}) {
    defer wg.Done()
    tagPicker := resolveTagPicker(tagName)
    if tagPicker == nil {
        dlog.Error("未识别的业务标签", common.ErrUnknownBusinessTag)
        return
    }
    go tagPicker.PickTagValueForUser(userId, queryArgs...)
    select {
    case <- ctx.Done(): // 超时返回
        return
    case tagValue := <- tagPicker.Notify(): // 接收标签值
        TagValuePair := &TagValuePair{
            Name:  tagName,
            Value: tagValue,
        }
        tagValueCh <- TagValuePair

        return
    }
}

最后就是具体标签查询器的实现了,每个标签都有自己的实现逻辑,下面是OrderNumTagPicker的示例代码:

type OrderNumTagPicker struct {
    TagName string
    ValueCh chan interface{}
}

type TradeNoInfo struct {
    TradeNo string `json:"trade_no"`
}

func (picker *OrderNumTagPicker) PickTagValueForUser(userId int64, args ...interface{}) {
    // 用类型转换得到交易号
        // tradeNo, ok := args[0].(string)
        extInfoJson, _ := args[0].(string)

    if err := json.Unmarshal([]byte(extInfoJson), &TradeNoInfo); err != nil {
                // log.Error自己实现 
        log.Error("PayTotalTagPickerError", "Invalid arg", args[0])
        // 结束执行并通知外部
        picker.ValueCh <- nil
        return
    }
        // 这里就打印下参数值,标签查询的具体逻辑自己实现
        fmt.Println(userId)
        fmt.Println(TradeNo)
    // 查询到的用户的标签 (假设交易号下有10个订单)
    picker.ValueCh <- 10
}

func (picker *OrderNumTagPicker) Notify () <-chan interface{} {
    return picker.ValueCh
}

完整的源代码参考: https://github.com/kevinyan815/gocookbook/tree/master/codes/tag_picker