spring-cloud / spring-cloud-circuitbreaker

Spring Cloud Circuit Breaker API and Implementations
Apache License 2.0
328 stars 109 forks source link

Circuit Breaker in reactive application #185

Closed Pokkybeep closed 2 weeks ago

Pokkybeep commented 3 weeks ago

I was trying to implement circuit breaker in my reactive application. But in logs it is always showing state as closed state, even after circuit breaker is triggered as this line is getting triggered -> log.error("Circuit breaker opened", throwable); I want to know how it is changing state it's behaviour in that state ?

Dependency->

implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j:3.1.1'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'io.github.resilience4j:resilience4j-micrometer:2.2.0'

This is my circuitBreakerConfig file->

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.circuitbreaker.resilience4j.ReactiveResilience4JCircuitBreakerFactory;
import org.springframework.cloud.client.circuitbreaker.Customizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Slf4j
@Configuration
public class CircuitBreakerConfiguration {

    @Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> customerServiceCusomtizer() {
        return factory -> {
            factory.configure(builder -> builder
                    .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build())
                    .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()), "eventService");

            factory.addCircuitBreakerCustomizer(circuitBreaker -> {
                circuitBreaker.getEventPublisher()
                        .onStateTransition(event -> log.info("Circuit breaker {} transitioned from {} to {}",
                                event.getCircuitBreakerName(), event.getStateTransition().getFromState(), event.getStateTransition().getToState()));
            }, "eventService");
        };
    }
}

And I am using it in event consumer class->

import org.springframework.cloud.client.circuitbreaker.ReactiveCircuitBreaker;
import org.springframework.cloud.client.circuitbreaker.ReactiveCircuitBreakerFactory;

@Slf4j
public class EventConsumer {
    @Getter
    @Setter
    private final ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory;
    private final ReactiveCircuitBreaker eventConsumerCircuitBreaker;

    public EventConsumer(KafkaReceiver<String, String> inputEventReceiver, ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory) {
        this.reactiveCircuitBreakerFactory = reactiveCircuitBreakerFactory;
        this.eventConsumerCircuitBreaker = reactiveCircuitBreakerFactory.create("eventService");
    }

    public Disposable consumeMessage() {
        return processRecord()
                .onBackpressureBuffer(BUFFER_SIZE)
                .limitRate(MAX_BATCH_SIZE)
                .doOnSubscribe(subscription -> logCircuitBreakerState())
                .subscribe(record -> {}, error -> log.error("error while consuming event with message {}", error.getMessage()));
    }

    public Flux<EventWrapper> processRecord() {
        Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(inputEventReceiver::receive);
        return receiverRecord.flatMap(this::processMessage);
    }

    private Flux<EventWrapper> processMessage(final ReceiverRecord<String, String> receiverRecord) {
        EventWrapper eventWrapper = messageHelper.fetchDataFromRecord(receiverRecord);
        return processEventThroughChain(eventWrapper, receiverRecord)
                .transform(it -> eventConsumerCircuitBreaker.run(it, throwable -> {
                    log.error("Circuit breaker opened", throwable);
                    logCircuitBreakerState();
                    return Flux.empty(); // Provide fallback logic if needed
                }));
    }

    private Flux<EventWrapper> processEventThroughChain(EventWrapper eventWrapper, ReceiverRecord<String, String> receiverRecord) {
        return eventProcessorChain.processEvent(eventWrapper)
                .filter(EventWrapper::isValid)
                .doOnNext(result -> receiverRecord.receiverOffset().acknowledge()).flux();
    }

    private void logCircuitBreakerState() {
        log.info("Current state of the circuit breaker is: {}", reactiveCircuitBreakerFactory);
    }
}
ryanjbaxter commented 3 weeks ago

Can you provide a complete, minimal, verifiable sample that reproduces the problem? It should be available as a GitHub (or similar) project or attached to this issue as a zip file.