platinummonkey / go-concurrency-limits

Go implementation of Netflix/concurrency-limits
Apache License 2.0
115 stars 20 forks source link

No examples tell me how to use LookupPartitionStrategy&PredicatePartitionStrategy #109

Open xiaofeige opened 1 year ago

xiaofeige commented 1 year ago

any examples?
why partitions cann't not be group by grpc method?

platinummonkey commented 1 year ago

You might look at the tests for current examples:

https://github.com/platinummonkey/go-concurrency-limits/blob/master/strategy/lookup_partition_test.go https://github.com/platinummonkey/go-concurrency-limits/blob/master/strategy/predicate_partition_test.go

The per method is interesting. Currently this https://github.com/platinummonkey/go-concurrency-limits/blob/master/grpc/grpc_unary.go#L11 applies to all methods but could easily obtain the method name (and context args for something even more custom) with your own interceptor. The per method name seems like a relatively good FR that can be generalized

xiaofeige commented 1 year ago

` func (i *InterceptorManager) GetAutoRateLimiter() lwp.ServerInterceptor {

emptyLimitFunc := func(ctx context.Context, req *msgpack.ArgsCodec, info *lwp.ServerInfo, handler lwp.UnaryHandler) (resp interface{}, err error) {
    return handler(ctx, req)
}

if !fileConf.Config.FileServerSettting.StartAutoRateLimit {
    return emptyLimitFunc
}

partitions := make(map[string]*strategy.LookupPartition)
partitions["default"] = strategy.NewLookupPartitionWithMetricRegistry("default", 0.5, 100, limitCore.EmptyMetricRegistryInstance)
partitions["/r/Icon/genAutomaticIcon"] = strategy.NewLookupPartitionWithMetricRegistry("/r/Icon/genAutomaticIcon", 0.2, 10, limitCore.EmptyMetricRegistryInstance)

fileLimitStrategy, err := strategy.NewLookupPartitionStrategyWithMetricRegistry(
    partitions,
    func(ctx context.Context) string {
        rpcCtx, ok := lwputil.RpcContextFromContext(ctx)
        if !ok {
            return "default"
        }

        if _, ok = partitions[rpcCtx.Url]; !ok {
            return "default"
        }

        return rpcCtx.Url
    },
    1000,
    limitCore.EmptyMetricRegistryInstance,
)

tags := make([]string, 0)
autoLimiter, err := limiter.NewDefaultLimiterWithDefaults(
    "file",
    fileLimitStrategy,
    &log.RateLimitLogger{},
    limitCore.EmptyMetricRegistryInstance,
    tags...,
)

if err != nil || autoLimiter == nil {
    log.Logger.Errorf("create auto limiter err:%v", err)
    return emptyLimitFunc
}

go func() {
    for {
        time.Sleep(1 * time.Second)
        log.Logger.Infof("[rate_limit_state]: %s estimate:%d", autoLimiter.String(), autoLimiter.EstimatedLimit())
    }
}()

classifyRspCode := func(url string, err error) string {
    if err == nil {
        return "success"
    }

    lwpStat, ok := err.(*status.LwpStatus)
    if !ok {
        log.Logger.Infof("convert lwp error failed, err:%T", err)
        return "ignore"
    }

    if lwpStat.LwpCode() == 408 || lwpStat.LwpCode() >= 500 {
        return "drop"
    }

    return "ignore"
}

return func(ctx context.Context, args *msgpack.ArgsCodec, info *lwp.ServerInfo, handler lwp.UnaryHandler) (resp interface{}, err error) {

    rpcCtx, ok := lwputil.RpcContextFromContext(ctx)
    if !ok {
        return nil, status.NewLwpBadRequestErrorWithScope(status.LwpDefaultLang, lwputil.BizCodeRpcContextMissing,
            "RpcContext_empty", "", lwputil.GetBizScope("AuthInterceptor"))
    }

    token, ok := autoLimiter.Acquire(ctx)
    if !ok {
        log.LogFmt.CommonErrorLog(ctx, "rate_limit", fmt.Errorf("auto_rate_limit"), map[string]interface{}{
            "uid": rpcCtx.Uid,
            "did": rpcCtx.Did,
        })
        return nil, status.NewLwpBadRequestErrorWithScope(status.LwpDefaultLang, fmt.Sprint(types.ErrTooManyRequest),
            "服务繁忙,请稍后重试", "", lwputil.GetBizScope("AuthInterceptor"))
    }
    defer func() {
        rspType := classifyRspCode(rpcCtx.Url, err)

        switch rspType {
        case "success":
            token.OnSuccess()
        case "ignore":
            token.OnIgnore()
        case "drop":
            token.OnDropped()
        default:
            token.OnIgnore()
        }
    }()

    return handler(ctx, args)
}

} `

this is how I code, but it seems like the token wasn't released correctly. In my presure test, all the request will be forbinden finally, and it won't recover unless I restart the server, even if I stop the presure test for a long time.

image