lzh2nix / articles

用 issue 来管理个人博客
https://github.com/lzh2nix/articles
61 stars 13 forks source link

一周一package(2024版本) #172

Open lzh2nix opened 2 months ago

lzh2nix commented 2 months ago

Content

lzh2nix commented 1 month ago

001 failsafe-go(2024W30)

https://github.com/failsafe-go/failsafe-go

熔断, 限流,降级 可谓是后端高可用的三件套, 故障复盘基本都会归结到这三点+告警缺失. failsafe-go 是个把相关组件封装的比较好的packege, 主要提供了以下几个package.

推荐的执行顺序是:

image

其也符合高可用的目标: 熔断—>重试 —> 降级. 从上面的结构可以看出这里的关键是最内层的func执行完之后基于一定的policy去做上层的执行, 具体 Policy 的接口如下:

type FailurePolicyBuilder[S any, R any] interface {
    // HandleErrors specifies the errors to handle as failures. Any errors that evaluate to true for errors.Is and the
    // execution error will be handled.
    HandleErrors(errors ...error) S

    // HandleResult specifies the results to handle as failures. Any result that evaluates to true for reflect.DeepEqual and
    // the execution result will be handled. This method is only considered when a result is returned from an execution, not
    // when an error is returned.
    HandleResult(result R) S

    // HandleIf specifies that a failure has occurred if the predicate matches the execution result or error.
    HandleIf(predicate func(R, error) bool) S

    // OnSuccess registers the listener to be called when the policy determines an execution attempt was a success.
    OnSuccess(listener func(ExecutionEvent[R])) S

    // OnFailure registers the listener to be called when the policy determines an execution attempt was a failure, and may
    // be handled.
    OnFailure(listener func(ExecutionEvent[R])) S
}

通过定制特定的policy 可以实现很灵活的定制. 下面就对每种模式做一个简单的整理.

Retry

重试这里最关键的一点是避免无脑的定时重试, 这种很容易演变成ddos 攻击. 在retry policy 可以通过

WithMaxRetries, WithBackoff 来指定最大重试次数和重试策略(这里比较推荐的是backoff算法).

// Retry on ErrConnecting up to 3 times with a 1 second delay between attempts
retryPolicy := retrypolicy.Builder[Connection]().
  HandleErrors(ErrConnecting).
  WithDelay(time.Second).
  WithMaxRetries(3).
  Build()

// Get with retries
connection, err := failsafe.Get(Connect, retryPolicy)

Circuit Breaker

熔断的目的就是在系统超过负载(已经返回了错误)的时候在请求侧做一层保护,避免系统进一步的恶化, failsafe-go 提供了两种模式的熔断 time basedcounter based. 当最近请求失败超过阈值进入熔断器 Opened 的状态, 再过一段时间之后进入half-open 状态, 这种状态下会处理一部分请求, 如果继续成功就进入closed 状态(全量放行), 如果在half-open 状态检测到失败就继续回到open 状态.

Closed——> Opened —delay—> Half-opened

// Opens after 5 failures, half-opens after 1 minute, closes after 2 successes
breaker := circuitbreaker.Builder[any]().
  HandleErrors(ErrSending).
  WithFailureThreshold(5).
  WithDelay(time.Minute).
  WithSuccessThreshold(2).
  Build()

// Run with circuit breaking
err := failsafe.Run(SendMessage, breaker)

failsafe-go 也是提供了比较灵活的配置:


// 最近5个请求,3个失败进入open 状态
builder.WithFailureThresholdRatio(3, 5)

// 一分钟内有三个失败进入open 状态
builder.WithFailureThresholdPeriod(3, time.Minute)

// 1分钟内 20个请求, 其中5个失败, 进入open 状态
builder.WithFailureRateThreshold(20, 5, time.Minute)

// Half-open 状态, 持续30s
builder.WithDelay(30*time.Second)

//在half-open 状态 进入close 状态的配置, 否则再次进入open
builder.WithSuccessThreshold(5)
builder.WithSuccessThresholdRatio(3, 5)

Rate Limiter

熔断是检测到异常之后出于保护系统的目的俩降低请求数, 限流的目的则是避免系统进入overload 状态. failsafe-go 提供了两种模式的ratelimit, smooth 模式和bursty 模式.

// 1s 100个请求, 也就是10ms 1个请求, 保证100个请求在1s内相对平滑, 10ms 之内超过1个则进行限制
limiter := ratelimiter.Smooth(100, time.Second)

// 1s 最多处理10个请求(可以是瞬时的), 超过10个则限制
limiter := ratelimiter.Bursty(10, time.Second)

默认情况下达到ratelimit 之后会直接返回 ratelimiter.ErrExceeded, 不过你也可以配置一个最大wait间隔, 使其后面有机会执行:

// Wait up to 1 second for execution permission
builder.WithMaxWaitTime(time.Second)

TimeOut

timeout 机制基本就是context 的Timeout机制差不多,不同点在用 failsafe里你可以和其他机制打配合.

// Timeout after 10 seconds
timeout := timeout.With[any](10*time.Second)
err := failsafe.Run(Connect, timeout)

Fallback

在有主备的地方使用Fallback 就比较合适, 主挂了, 使用备

connection, err := failsafe.Get(Connect, fallback)

Hedge

类似赛马机制, 同时放n个请求到后端, 以第一个返回左右结果, 其他的request 直接cancel掉, 这种模式虽然会提高相应速度但是会增加后端整体的负载, 在使用时还需谨慎.

// Hedge up to 2 times with a 1 second delay between attempts
hedgePolicy := hedgepolicy.BuilderWithDelay[any](time.Second).
  WithMaxHedges(2).
  Build()

// Run with hedges
err := failsafe.Run(SendRequest, hedgePolicy)

Bulkhead

并发控制, 尤其是go func() xxx 只需要控制在10个 goroutine的场景

// Permit 10 concurrent executions
bulkhead := bulkhead.With[any](10)
err := failsafe.Run(SendRequest, bulkhead)

// 默认超过10个之后会立马返回错误(ErrFull), 当然这里可以设置wait time
bulkhead := bulkhead.Builder[any](10).
  WithMaxWaitTime(time.Second).
  Build()

这里maxWaitTimeout 也是很容易遇到坑, for loop 限制 go routine 并发的数量多场景还是推荐其他package.

Cache

不是特别推荐, 有更好的package.

总结

整理来看除 Cache, Bulkhead 之外的几个机制都可以尝试. 整体代码使用范型, 通用性也比较强.

lzh2nix commented 1 month ago

002 conc(2024W32)

https://github.com/sourcegraph/conc

golang 确实是入门很快, 精通很难的语言. 尤其是在并发场景下很容易写出go routine泄漏的代码, souregraph 的这个库的目标也是和这个相关:

这三点老手写代码的时候也是很容易犯错, 通过一个 package 来规范这些也是日常所需. conc在处理这块儿代码确实很简洁(少即是多, 减少犯错的空间):

image

通过pool来控制并发就更方便了:

func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) {
    p := pool.NewWithResults[string]().WithContext(ctx)
    for _, firstName := range firstNames {
        firstName := firstName
        p.Go(func(ctx context.Context) (string, error) {
            return fetchLastName(ctx, firstName)
        })
    }
    return p.Wait()
}

使用Iter 就更丝滑:

func fetchLastNames2(ctx context.Context, firstNames []string) ([]string, error) {
    return iter.MapErr(firstNames, func(firstName *string) (string, error) {
        return fetchLastName(ctx, *firstName)
    })
}

不用太多冗余的代码(控制写入的并发控制), 也可以通过自定义MapIter的方式控制并发数量:

input := []int{1, 2, 3, 4}
mapper := Mapper[int, bool]{
    MaxGoroutines: len(input) / 2,
}

results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)

另外也提供了通过stream模式来请求数据(异步网络请求+尽快将结果显示给用户的场景):

func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) {
    s := stream.New()
    for fileName := range fileNames {
        fileName := fileName
        s.Go(func() stream.Callback {
            contents := fetchFileContents(ctx, fileName)
            return func() { fileContents <- contents }
        })
    }
    s.Wait()
}

总结: pool, iter, stream 基本能满足go routine的并发控制

lzh2nix commented 1 month ago

003 otter(2024W33)

https://github.com/maypok86/otter

缓存作为一种提高性能的利器, 在各种性能优化场合都是广泛提及. 也是有各种实现:

缓存淘汰方面基本上也是依赖LRU做缓存淘汰策略. 而LRU基本都是通过一个双链表+map 实现, 其在多读的情况性能相对比较差(缓存命时移动到表头, 也伴随着锁):

image

于是有了基于FIFO的各种改进(ARC,2Q,LIRS, TinyLFU), s3-fifo 基于观测数据(大部份缓存数据实际上只访问了一次)重新设计的一种缓存淘汰策略. 将这部分key快速从缓存中淘汰掉.

image

通过FIFO 避免了链表的复杂操作, 元素在插入时先放small FIFO这样可以快速淘汰(基于之前的统计72%的元素只访问一次).

otter 就是其中一个实现, 其在75% read, 25% writer 的场景下性能很很多:

image

在接口层面使用也是比较简单(构建/读写):

 package main

import (
    "fmt"
    "time"

    "github.com/maypok86/otter"
)

func main() {
    // create a cache with capacity equal to 10000 elements
    cache, err := otter.MustBuilder[string, string](10_000).
        CollectStats().
        Cost(func(key string, value string) uint32 {
            return 1
        }).
        WithTTL(time.Hour).
        Build()
    if err != nil {
        panic(err)
    }

    // set item with ttl (1 hour) 
    cache.Set("key", "value")

    // get value from cache
    value, ok := cache.Get("key")
    if !ok {
        panic("not found key")
    }
    fmt.Println(value)

    // delete item from cache
    cache.Delete("key")

    // delete data and stop goroutines
    cache.Close()
}