woshikid / blog

Apache License 2.0
8 stars 1 forks source link

Resilience4j学习笔记 #176

Open woshikid opened 2 years ago

woshikid commented 2 years ago

通用API

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-core</artifactId>
    <version>1.7.1</version>
</dependency>

Try

String result = Try.ofSupplier(() -> "trying")
    .map(v -> "mapping")
    .recover(e -> "fallback")
    .onSuccess(v -> {}) // 异常recover之后为Success
    .onFailure(e -> {}) // 异常recover之后不再触发
    .andThen(v -> {}) // 等同于onSuccess
    .andFinally(() -> {}) // 类似于finally
    .get(); // 异常时抛出

Try.runRunnable(() -> {})
    .onFailure(e -> {}) // 异常时执行
    .andThen(() -> {}) // 无异常时执行
    .andFinally(() -> {}) // 类似于finally
    .get(); // 异常时抛出

SupplierUtils

Supplier supplier = SupplierUtils.recover(() -> "trying", e -> "fallback");
Supplier supplier = SupplierUtils.andThen(() -> "trying", v -> "mapping");
Supplier supplier = SupplierUtils.andThen(() -> "trying", v -> "mapping", e -> "fallback");

断路器

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-circuitbreaker</artifactId>
    <version>1.7.1</version>
</dependency>

CircuitBreakerConfig

//CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.ofDefaults();
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
    .minimumNumberOfCalls(100) // 熔断最小请求数,默认为100
    .slidingWindowSize(100) // 熔断窗口大小,默认为100,单位为请求数或秒
    .slidingWindowType(SlidingWindowType.COUNT_BASED) // 熔断窗口类型,默认为计数,可设为计时
    .waitDurationInOpenState(Duration.ofSeconds(60)) // 熔断时间(到时间后半开),默认为60秒
    .permittedNumberOfCallsInHalfOpenState(10) // 半开尝试请求数,默认为10
    .failureRateThreshold(50) // 熔断错误率,默认为50
    .slowCallDurationThreshold(Duration.ofSeconds(60)) // 慢请求执行时间,默认为60秒
    .slowCallRateThreshold(100) // 熔断慢请求率,默认为100
    .enableAutomaticTransitionFromOpenToHalfOpen() // 熔断超时后自动半开,默认超时后由新请求触发半开
    .recordExceptions(null) // 需要熔断的异常,默认为所有
    .ignoreExceptions(null) // 无需熔断的异常,默认为无
    .build();

CircuitBreakerRegistry

//CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);

CircuitBreaker

CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("test");
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("test", circuitBreakerConfig);
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("test");
CircuitBreaker circuitBreaker = CircuitBreaker.of("test", circuitBreakerConfig);

使用CircuitBreaker

String result = circuitBreaker.executeSupplier(() -> "trying");
String result = CircuitBreaker.decorateSupplier(circuitBreaker, () -> "trying").get();
String result = Try.ofSupplier(CircuitBreaker.decorateSupplier(circuitBreaker, () -> "trying")).get();

// 手动记录异常/成功调用
circuitBreaker.onError(0, TimeUnit.SECONDS, new RuntimeException());
circuitBreaker.onSuccess(0, TimeUnit.SECONDS);

// 手动切换状态/重置
circuitBreaker.transitionToOpenState();
circuitBreaker.transitionToHalfOpenState();
circuitBreaker.transitionToForcedOpenState();
circuitBreaker.transitionToClosedState();
circuitBreaker.transitionToDisabledState();
circuitBreaker.reset();

// 获得状态
State state = circuitBreaker.getState(); // enum
Metrics metrics = circuitBreaker.getMetrics();
metrics.getFailureRate();
metrics.getSlowCallRate();
metrics.getNumberOfFailedCalls();
metrics.getNumberOfSlowCalls();

// 监听事件
circuitBreaker.getEventPublisher()
    .onCallNotPermitted(e -> {})
    .onFailureRateExceeded(e -> {})
    .onSlowCallRateExceeded(e -> {})
    .onStateTransition(e -> {})
    .onEvent(e -> {});

限流器

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

RateLimiterConfig

//RateLimiterConfig rateLimiterConfig = RateLimiterConfig.ofDefaults();
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()
    .limitForPeriod(50) // 限流请求数,默认为50
    .limitRefreshPeriod(Duration.ofNanos(500)) // 限流周期,默认为500纳秒
    .timeoutDuration(Duration.ofSeconds(5)) // 等待超时,默认为5秒
    .build();

RateLimiterRegistry

//RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(rateLimiterConfig);

RateLimiter

RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test");
RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test", rateLimiterConfig);
RateLimiter rateLimiter = RateLimiter.ofDefaults("test");
RateLimiter rateLimiter = RateLimiter.of("test", rateLimiterConfig);

使用RateLimiter

String result = rateLimiter.executeSupplier(() -> "trying");
String result = RateLimiter.decorateSupplier(rateLimiter, () -> "trying").get();
String result = Try.ofSupplier(RateLimiter.decorateSupplier(rateLimiter, () -> "trying")).get();

// 调整限流参数
rateLimiter.changeLimitForPeriod(10);
rateLimiter.changeTimeoutDuration(Duration.ofSeconds(1));

// 手动限流
rateLimiter.reservePermission(); // 消耗一个请求
rateLimiter.drainPermissions(); // 消耗所有请求

// 获得状态
Metrics metrics = rateLimiter.getMetrics();
metrics.getAvailablePermissions();
metrics.getNumberOfWaitingThreads();

// 监听事件
rateLimiter.getEventPublisher()
    .onFailure(e -> {})
    .onEvent(e -> {});

请求隔离

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
    <version>1.7.1</version>
</dependency>

BulkheadConfig

// 信号量
//BulkheadConfig bulkheadConfig = BulkheadConfig.ofDefaults();
BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
    .fairCallHandlingStrategyEnabled(true) // 线程公平(FIFO),默认为true
    .maxConcurrentCalls(25) // 并发执行数,默认为25
    .maxWaitDuration(Duration.ofSeconds(0)) // 等待超时,默认为0(立即报错)
    .build();

// 线程池
//ThreadPoolBulkheadConfig threadPoolBulkheadConfig = ThreadPoolBulkheadConfig.ofDefaults();
ThreadPoolBulkheadConfig threadPoolBulkheadConfig = ThreadPoolBulkheadConfig.custom()
    .maxThreadPoolSize(8) // 最大线程数,默认为availableProcessors()
    .coreThreadPoolSize(7) // 核心线程数,默认为availableProcessors() - 1
    .queueCapacity(100) // 等待队列长度,默认为100
    .keepAliveDuration(Duration.ofMillis(20)) // 线程最大空闲时间,默认为20ms
    .build();

BulkheadRegistry

// 信号量
//BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.of(bulkheadConfig);

// 线程池
//ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = ThreadPoolBulkheadRegistry.ofDefaults();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = ThreadPoolBulkheadRegistry.of(threadPoolBulkheadConfig);

Bulkhead

// 信号量
Bulkhead bulkhead = bulkheadRegistry.bulkhead("test");
Bulkhead bulkhead = bulkheadRegistry.bulkhead("test", bulkheadConfig);
Bulkhead bulkhead = Bulkhead.ofDefaults("test");
Bulkhead bulkhead = Bulkhead.of("test", bulkheadConfig);

// 线程池
ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead("test");
ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead("test", threadPoolBulkheadConfig);
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead.ofDefaults("test");
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead.of("test", threadPoolBulkheadConfig);

使用Bulkhead

// 信号量
String result = bulkhead.executeSupplier(() -> "trying");
String result = Bulkhead.decorateSupplier(bulkhead, () -> "trying").get();
String result = Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead, () -> "trying")).get();

// 线程池
CompletionStage<String> result = threadPoolBulkhead.executeSupplier(() -> "trying");
CompletionStage<String> result = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead, () -> "trying").get();

// 调整隔离参数
bulkhead.changeConfig(bulkheadConfig);

// 获得状态
Metrics metrics = bulkhead.getMetrics();
metrics.getAvailableConcurrentCalls();
metrics.getMaxAllowedConcurrentCalls();

Metrics metrics = threadPoolBulkhead.getMetrics();
metrics.getQueueDepth();
metrics.getRemainingQueueCapacity();
metrics.getThreadPoolSize();

// 监听事件
bulkhead.getEventPublisher()
    .onCallRejected(e -> {})
    .onEvent(e -> {});

threadPoolBulkhead.getEventPublisher()
    .onCallRejected(e -> {})
    .onEvent(e -> {});

请求重试

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-retry</artifactId>
    <version>1.7.1</version>
</dependency>

RetryConfig

//RetryConfig retryConfig = RetryConfig.ofDefaults();
RetryConfig retryConfig = RetryConfig.custom()
    .maxAttempts(3) // 默认为3次=初始1次+重试2次
    .waitDuration(Duration.ofMillis(500)) // 退让时间,默认为500ms
    //.intervalFunction(n -> 500L) // 自定义退让函数,与waitDuration二选一
    //.intervalFunction(IntervalFunction.ofExponentialRandomBackoff()) // 指数随机退让
    .retryOnResult(v -> false) // 需要重试的返回值,默认不重试
    .retryOnException(e -> true) // 需要重试的异常,默认全重试
    .retryExceptions() // 需要重试的异常,覆盖retryOnException
    .ignoreExceptions() // 需要忽略的异常,默认为空
    .failAfterMaxAttempts(false) // retryOnResult达到最大次数时抛出异常,默认返回最后一次结果
    .build();

RetryRegistry

//RetryRegistry retryRegistry = RetryRegistry.ofDefaults();
RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);

Retry

Retry retry = retryRegistry.retry("test");
Retry retry = retryRegistry.retry("test", retryConfig);
Retry retry = Retry.ofDefaults("test");
Retry retry = Retry.of("test", retryConfig);

使用Retry

String result = retry.executeSupplier(() -> "trying");
String result = Retry.decorateSupplier(retry, () -> "trying").get();
String result = Try.ofSupplier(Retry.decorateSupplier(retry, () -> "trying")).get();

// 获得状态
Metrics metrics = retry.getMetrics();
metrics.getNumberOfFailedCallsWithoutRetryAttempt();
metrics.getNumberOfFailedCallsWithRetryAttempt();

// 监听事件
retry.getEventPublisher()
    .onRetry(e -> {})
    .onEvent(e -> {});

限时器

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-timelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

TimeLimiterConfig

//TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.ofDefaults();
TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom()
    .timeoutDuration(Duration.ofSeconds(1)) // Future的执行超时时间,默认为1秒
    .cancelRunningFuture(true) // 超时后调用Future.cancel(),默认为true
    .build();

TimeLimiterRegistry

//TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.of(timeLimiterConfig);

TimeLimiter

TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter("test");
TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter("test", timeLimiterConfig);
TimeLimiter timeLimiter = TimeLimiter.ofDefaults("test"); // TimeLimiter.ofDefaults()
TimeLimiter timeLimiter = TimeLimiter.of("test", timeLimiterConfig); // TimeLimiter.of(timeLimiterConfig)
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));

使用TimeLimiter

// 非阻塞调用
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
CompletionStage<String> result = timeLimiter.executeCompletionStage(scheduledExecutorService, () -> CompletableFuture.supplyAsync(() -> "trying"));
CompletionStage<String> result = TimeLimiter.decorateCompletionStage(timeLimiter, scheduledExecutorService, () -> CompletableFuture.supplyAsync(() -> "trying")).get();

// 阻塞调用
String result = timeLimiter.executeFutureSupplier(() -> CompletableFuture.supplyAsync(() -> "trying"));
String result = TimeLimiter.decorateFutureSupplier(timeLimiter, () -> CompletableFuture.supplyAsync(() -> "trying")).call();
String result = Try.ofCallable(TimeLimiter.decorateFutureSupplier(timeLimiter, () -> CompletableFuture.supplyAsync(() -> "trying"))).get();

// 监听事件
timeLimiter.getEventPublisher()
    .onTimeout(e -> {})
    .onEvent(e -> {});

请求缓存(依赖JCache实现)

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-cache</artifactId>
    <version>1.7.1</version>
</dependency>

Cache

CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
Cache<String, String> cache = Cache.of(cacheManager.createCache("cacheName", new MutableConfiguration<>()));

使用Cache

String result = cache.computeIfAbsent("cacheKey", () -> "trying");
String result = Cache.decorateSupplier(cache, () -> "trying").apply("cacheKey");

// 获得状态
Metrics metrics = cache.getMetrics();
metrics.getNumberOfCacheHits();
metrics.getNumberOfCacheMisses();

// 监听事件
cache.getEventPublisher()
    .onCacheMiss(e -> {})
    .onEvent(e -> {});

Spring Boot

POM

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

内部调用AOP

AopContext.currentProxy(); // 需打开expose-proxy

断路器

自定义配置

resilience4j:
  circuitbreaker:
    configs:
      default: # 全局配置
        minimum-number-of-calls: 100 # 熔断最小请求数,默认为100
        sliding-window-size: 100 # 熔断窗口大小,默认为100,单位为请求数或秒
        sliding-window-type: COUNT_BASED # 熔断窗口类型,默认为计数,可设为计时
        wait-duration-in-open-state: 60s # 熔断时间(到时间后半开),默认为60秒
        permitted-number-of-calls-in-half-open-state: 10 # 半开尝试请求数,默认为10
        failure-rate-threshold: 50 # 熔断错误率,默认为50
        slow-call-duration-threshold: 60s # 慢请求执行时间,默认为60秒
        slow-call-rate-threshold: 100 # 熔断慢请求率,默认为100
        automatic-transition-from-open-to-half-open-enabled: false # 熔断超时后自动半开,默认超时后由新请求触发半开
        record-exceptions: [] # 需要熔断的异常,默认为所有
        ignore-exceptions: [] # 无需熔断的异常,默认为无
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        minimum-number-of-calls: 10 # 自定义配置,覆盖全局/默认配置

请求熔断/降级

@CircuitBreaker(name = "test")
@CircuitBreaker(name = "test", fallbackMethod = "fallback") // fallback方法返回值类型需与原方法一致,方法参数需为Throwable或原方法参数加Throwable

限流器

自定义配置

resilience4j:
  ratelimiter:
    configs:
      default: # 全局配置
        limit-for-period: 50 # 限流请求数,默认为50
        limit-refresh-period: 500ns # 限流周期,默认为500纳秒
        timeout-duration: 5s # 等待超时,默认为5秒
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        limit-refresh-period: 1s # 自定义配置,覆盖全局/默认配置

请求限流/降级

@RateLimiter(name = "test")
@RateLimiter(name = "test", fallbackMethod = "fallback") // fallback方法返回值类型需与原方法一致,方法参数需为Throwable或原方法参数加Throwable

请求隔离

自定义配置

resilience4j:
  bulkhead:
    configs:
      default: # 全局配置
        max-concurrent-calls: 25 # 并发执行数,默认为25
        max-wait-duration: 0s # 等待超时,默认为0(立即报错)
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        max-concurrent-calls: 1 # 自定义配置,覆盖全局/默认配置
  thread-pool-bulkhead:
    configs:
      default: # 全局配置
        max-thread-pool-size: 8 # 最大线程数,默认为availableProcessors()
        core-thread-pool-size: 7 # 核心线程数,默认为availableProcessors() - 1
        queue-capacity: 100 # 等待队列长度,不能为0,默认为100
        keep-alive-duration: 20ms # 线程最大空闲时间,默认为20ms
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        core-thread-pool-size: 1 # 自定义配置,覆盖全局/默认配置

请求隔离/降级

@Bulkhead(name = "test")
@Bulkhead(name = "test", fallbackMethod = "fallback") // fallback方法返回值类型需与原方法一致,方法参数需为Throwable或原方法参数加Throwable
@Bulkhead(name = "test", type = Type.THREADPOOL) // 线程池异步执行,必须返回CompletableFuture

请求重试

自定义配置

resilience4j:
  retry:
    configs:
      default: # 全局配置
        max-attempts: 3 # 默认为3次=初始1次+重试2次
        wait-duration: 500ms # 退让时间,默认为500ms
        #enable-exponential-backoff: false # 开启指数退让,默认为false
        #exponential-backoff-multiplier: 1.5 # 退让乘数,默认为1.5
        #exponential-max-wait-duration: null # 最大指数退让时间,默认为空
        #enable-randomized-wait: false # 开启随机退让,默认为false
        #randomized-wait-factor: 0.5 # 随机乘数,默认为0.5
        retry-exceptions: [] # 需要重试的异常,默认全重试
        ignore-exceptions: [] # 需要忽略的异常,默认为空
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        max-attempts: 2 # 自定义配置,覆盖全局/默认配置

或使用Java配置

@Bean
public RetryConfigCustomizer testCustomizer() {
    return RetryConfigCustomizer.of("test", builder -> builder.retryOnResult(v -> false)); // yml中必须配置instances.test
}

请求重试/降级

@Retry(name = "test")
@Retry(name = "test", fallbackMethod = "fallback") // fallback方法返回值类型需与原方法一致,方法参数需为Throwable或原方法参数加Throwable

限时器

自定义配置

resilience4j:
  timelimiter:
    configs:
      default: # 全局配置
        timeout-duration: 1s # Future的执行超时时间,默认为1秒
        cancel-running-future: true # 超时后调用Future.cancel(),默认为true
    instances:
      test: # 指定配置,如不指定则使用全局配置
        base-config: default # 继承的全局配置,如不指定则使用默认配置
        timeout-duration: 10s # 自定义配置,覆盖全局/默认配置

请求限时/降级

@TimeLimiter(name = "test") // 阻塞调用,必须返回CompletableFuture
@TimeLimiter(name = "test", fallbackMethod = "fallback") // fallback方法返回值类型需与原方法一致,方法参数需为Throwable或原方法参数加Throwable

AOP优先级

多个注解组合使用时,需要注意优先级

默认顺序

@Retry(@CircuitBreaker(@RateLimiter(@TimeLimiter(@Bulkhead()))))

自定义顺序

resilience4j:
  retry:
    retry-aspect-order: 1
  circuitbreaker:
    circuit-breaker-aspect-order: 2
  ratelimiter:
    rate-limiter-aspect-order: 3
  timelimiter:
    time-limiter-aspect-order: 4
  #bulkhead默认最低,无法修改

Actuator

开启CircuitBreaker与RateLimiter的health状态

management.health.circuitbreakers.enabled: true
management.health.ratelimiters.enabled: true

resilience4j:
  circuitbreaker:
    configs:
      default:
        register-health-indicator: true
  ratelimiter:
    configs:
      default:
        register-health-indicator: true