spring-cloud / spring-cloud-circuitbreaker

Spring Cloud Circuit Breaker API and Implementations
Apache License 2.0
329 stars 110 forks source link

Spring Boot ReactiveCircuitBreaker configuration not working #115

Closed cipley closed 2 years ago

cipley commented 3 years ago

I'm using a circuit breaker implementation in my reactive web service built on Spring Boot Webflux. I'm using below dependencies in pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.4</version>
</parent>
...
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
    <version>2.0.1</version>
</dependency>

And then I created the beans related to Circuit Breaker:

@Configuration
public class NetworkProfileCircuitBreakerConfig {
...
    @Bean("networkProfileCircuitBreakerFactory")
    public ReactiveCircuitBreakerFactory networkProfileCircuitBreakerFactory() {
        return new ReactiveResilience4JCircuitBreakerFactory();
    }

    @Bean 
    public ReactiveCircuitBreaker networkProfileCircuitBreaker(@Qualifier("networkProfileCircuitBreakerFactory") ReactiveCircuitBreakerFactory factory) {
        return factory.create("networkProfileCircuitBreaker");
    }

    @Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> circuitBreakerCustomizer() {
        return factory -> {
            factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
                    .circuitBreakerConfig(CircuitBreakerConfig.custom()
                            .failureRateThreshold(failureRate)
                            .minimumNumberOfCalls(minimumNumberOfCalls)
                            .slidingWindowSize(slidingWindowSize)
                            .enableAutomaticTransitionFromOpenToHalfOpen()
                            .waitDurationInOpenState(Duration.ofMillis(waitDurationInOpenState))
                            .ignoreExceptions(BadRequestException.class)
                            .build())
                    .timeLimiterConfig(TimeLimiterConfig.custom()
                            .timeoutDuration(Duration.ofMillis(timeLimiter))
                            .build())
                    .build());
            factory.addCircuitBreakerCustomizer(circuitBreaker -> circuitBreaker.getEventPublisher()
                    .onStateTransition(e -> {
                        switch(e.getStateTransition().getToState()) {
                            case CLOSED:
                                log.info("Circuit Breaker is now CLOSED.");
                                break;
                            case HALF_OPEN:
                                log.info("Circuit Breaker is now HALF_OPEN.");
                                break;
                            case OPEN:
                                log.info("Circuit Breaker is now OPEN!");
                                break;
                            case METRICS_ONLY:
                                break;
                            default:
                                break;
                        }
                    }), "circuitBreakerStateTransitionEvents");
        };
    }

}

Then I autowired the ReactiveCircuitBreaker bean to my service in order to use it in my reactive call:

...
@Service
public class NetworkProfileService {
...
    @Autowired
    private ReactiveCircuitBreaker networkProfileCircuitBreaker;

...
    public Mono<ResponseEntity<NetworkProfileResponse>> getNetworkProfile(NetworkProfileRequest request) {
        return networkProfileCircuitBreaker.run(adapter.getData(request)
                , throwable -> {
                        //Fallback method
        });
    }
}

However, it seems like my ReactiveResilience4JCircuitBreakerFactory is not working properly; the circuit breaker seems to be using default settings instead of my customized settings. I have tried may things including moving the Factory to my Service class constructor, moved the configuration to my applicaition.properties or application.yml ; to no avail.

Is there anything that I might have missed?

ryanjbaxter commented 3 years ago

Can you provide a complete, minimal, verifiable sample that reproduces the problem? It should be available as a GitHub (or similar) project or attached to this issue as a zip file.

cipley commented 3 years ago

Hi, I'll get back when I have prepared a sample; been held up at work for some time now.

cipley commented 3 years ago

Sorry for the delay; the sample can be obtained from: https://github.com/cipley/spring-boot-reactive-circuit-breaker

The unit test can produce my concerns:

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured) at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294) at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279) at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

2021-06-18 07:44:02.111 DEBUG 14732 --- [ parallel-3] i.g.r.c.i.CircuitBreakerStateMachine : No Consumers: Event ERROR not published


- I also have configured the circuit breaker to ignore my custom exception (`ServiceUnavailableException`), yet it is still logged as recorded:

2021-06-18 07:44:02.263 DEBUG 14732 --- [ctor-http-nio-4] i.g.r.c.i.CircuitBreakerStateMachine : CircuitBreaker 'sampleCircuitBreaker' recorded an exception as failure:

com.cipley.sample.model.exception.ServiceUnavailableException: null at com.cipley.sample.adapter.SampleAdapter.lambda$6(SampleAdapter.java:50) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ? 500 from GET http://localhost:9000?request=test [DefaultWebClient] Stack trace: at com.cipley.sample.adapter.SampleAdapter.lambda$6(SampleAdapter.java:50) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259) at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401) at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416) at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470) at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)

2021-06-18 07:44:02.263 DEBUG 14732 --- [ctor-http-nio-4] i.g.r.c.i.CircuitBreakerStateMachine : No Consumers: Event ERROR not published



It seems that the custom circuit breaker config that I defined have no effect; though it is still possible that I might missed something.
This more or less could represent what I am encountering in my project right now.
ryanjbaxter commented 3 years ago

Sorry I am not sure how to reproduce the problem. I ran SampleCircuitBreakerTest and all the tests pass. You are using a mixture of Junit 4 and Junit 5 in that class please fix that and better describe how to use the sample to reproduce the problem.

cipley commented 3 years ago

Hi, I made the unit test to simulate how the circuit breaker will trigger, in the end it is expected to pass by the way the test is made. That's why I configured the log in DEBUG so I could see whether my circuit configuration takes effect.

Inside SampleCircuitBreakerConfig.java is the circuit breaker config, as below:

@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> customizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
                .circuitBreakerConfig(CircuitBreakerConfig.custom()
                        .failureRateThreshold(failureRate)
                        .minimumNumberOfCalls(minNumberOfCalls)
                        .slidingWindowSize(slidingWindowSize)
                        .waitDurationInOpenState(Duration.ofMillis(waitDurationOpen))
                        .ignoreExceptions(ServiceUnavailableException.class)
                        .build())
                .timeLimiterConfig(TimeLimiterConfig.custom()
                        .timeoutDuration(Duration.ofMillis(timeLimiter))
                        .build())
                .build());
}

Using the application.properties from the test package resource, the unit test should have a circuit breaker with:

But from the Debug log when the unit test is running, we could see that those expectations is not happening:

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured)

I suppose this indicates the Timeout Duration still using the default value of 1 second;

2021-06-18 07:44:02.263 DEBUG 14732 --- [ctor-http-nio-4] i.g.r.c.i.CircuitBreakerStateMachine     : CircuitBreaker 'sampleCircuitBreaker' recorded an exception as failure:

com.cipley.sample.model.exception.ServiceUnavailableException: null
        at com.cipley.sample.adapter.SampleAdapter.lambda$6(SampleAdapter.java:50)

And this log indicates that the exception type that is supposed to be ignored is still being recorded as a failure.

ryanjbaxter commented 3 years ago

The problem is that you are creating a ReactiveCircuitBreaker bean and that is being created before the Customizer is run, hence your customization hasn't happened. I don't know why you are creating a Bean for the circuit breaker. I also don't know why you are creating your own ReactiveCircuitBreakerFactory bean, neither of these things are necessary.

spring-cloud-issues commented 3 years ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

spring-cloud-issues commented 3 years ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.