resilience4j / resilience4j

Resilience4j is a fault tolerance library designed for Java8 and functional programming
Apache License 2.0
9.81k stars 1.35k forks source link

ContextPropagator is not adding headers to resilience thread #1679

Open hemantkakodia opened 2 years ago

hemantkakodia commented 2 years ago

Resilience4j version: 1.7.0

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>

Springboot Version: 2.6.6 Java version: 17

We are using resilience4j.thread-pool-bulkhead and in Main thread, we are using some headers items in logs like below: Main Thread: 2022-05-03 11:10:10||Application_Name=<servicename>||GUID=<traceId>||Audience=<audienceId>||ID-Audience=<audiencename> But with methods with resilience annotations, we are missing those headers in logs: eg GUID etc Resilience Thread: 2022-05-03 11:10:13||Application_Name=<servicename>||GUID=||Audience=||ID-Audience=|| ForkJoinPool.commonPool-worker-1

So, in resilience thread, these values are not mapped from main thread.

Below are the configurations, i am using for ContextPropagator which don't seem to work:

application.yaml

resilience4j.thread-pool-bulkhead:
  instances:
    backendA:
      maxThreadPoolSize: 23
      coreThreadPoolSize: 20
      queueCapacity: 100
      writableStackTraceEnabled: false
      contextPropagators:
        - com.config.CustomContextPropagator

Code for Custom Context Propagator: com.config.CustomContextPropagator

@Component
public class CustomContextPropagator implements ContextPropagator<Map<String, String>> {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public Supplier<Optional<Map<String, String>>> retrieve() {
        return () -> Optional.ofNullable(MDC.getCopyOfContextMap());
    }

    @Override
    public Consumer<Optional<Map<String, String>>> copy() {
        return (copyOfContextMap) -> copyOfContextMap.ifPresent(MDC::setContextMap);
    }

    @Override
    public Consumer<Optional<Map<String, String>>> clear() {
        return (contextMap) -> MDC.clear();
    }
}

I would appreciate help.

hemantkakodia commented 2 years ago

@RobWin Hi. Can you please help here?

hemantkakodia commented 2 years ago

@RobWin Need help

RobWin commented 2 years ago

Hi, are you sure it's the Resilience4j thread pool? Can you log thread names? I can see ForkJoinPool.commonPool-worker-1?

The threads from our Bulkhead-Threadpool contain the name of the Bulkhead. Please see: https://github.com/resilience4j/resilience4j/blob/d1dd82149b1a3a1e57338751e728713679d8fe24/resilience4j-core/src/main/java/io/github/resilience4j/core/NamingThreadFactory.java#L36

hemantkakodia commented 2 years ago

Hi @RobWin Thanks for writing back on this issue. Below is the piece of code i am implementing.

package io.github.resilience4j.bulkhead.annotation;
package io.github.resilience4j.circuitbreaker.annotation;

@Override
    @ElapsedTime(backend = Constant.backendA)
    @Bulkhead(name = Constant.backendA, type = Bulkhead.Type.THREADPOOL)
    @CircuitBreaker(name = Constant.backendA)
    public ProfileResponse retrieveProfile(ProfileRequest request,
                                                             Map<HeaderUtil.HeaderEnum, String> headerMap,
                                                             String guid)
            throws CompletionException, ExecutionException, InterruptedException {
        io.github.resilience4j.timelimiter.TimeLimiter timeLimiter = resilienceTimeLimiterConfig.getTimeLimiter(Constant.backendA);

        Supplier<CompletableFuture<ProfileResponse >> result = () -> CompletableFuture
                .supplyAsync(() ->
                {
                    ProfileResponse profileResponse = null;
                    try {
                        profileResponse  = retrieveProfile(request, headerMap, guid);
                    } catch (JsonProcessingException e) {
                        LOGGER.error("Error while calling POST Profile ", e.getMessage());
                    }
                    return profileResponse;
                });

        return timeLimiter.executeCompletionStage(resilienceTimeLimiterConfig.getScheduledExecutorService(Constant.backendA, env), result).toCompletableFuture().get();
    }

In method retrieveProfile(), when used Thread.currentThread().getName(), i got below: ForkJoinPool.commonPool-worker-1

Please let me know if i missed anything. Thanks.

RobWin commented 2 years ago

Hi,

1) If you are using a ThreadPoolBulkhead your method must always return a CompletionStage or CompletableFuture. 2) If you are using a ThreadPoolBulkhead you should not spawn another thread in another threapool, e.g. CompletableFuture.supplyAsync, because the ThreadPoolBulkhead is already spawning a thread for your. In your case you are spawning 2 threads per request đź‘Ž

Either use:

    @Override
    @ElapsedTime(backend = Constant.backendA)
    @Bulkhead(name = Constant.backendA, type = Bulkhead.Type.THREADPOOL)
    @CircuitBreaker(name = Constant.backendA)
    @TimeLimiter(name = Constant.backendA)
    public CompletableFuture<ProfileResponse> retrieveProfile(ProfileRequest request,
                                                             Map<HeaderUtil.HeaderEnum, String> headerMap,
                                                             String guid) {
            return CompletableFuture.completedFuture(retrieveProfile(request, headerMap, guid))
    }

And for the timelimiter thread context propagation you need in your config

resilience4j.scheduled.executor:
  corePoolSize: 10
  contextPropagators:
        - com.config.CustomContextPropagator
saeedansari commented 1 year ago

Hi, I am using spring boot + RS4 CircuitBreaker without annotation. This is how I configure a CB and timelimiter:

@Bean
public Customizer<Resilience4JCircuitBreakerFactory> circuitBreakerFactoryCustomizer(ActorCircuitBreakerProperties actorCircuitBreakerProperties) {
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
                .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
                .slidingWindowSize(actorCircuitBreakerProperties.getSlidingWindowSize())
                .minimumNumberOfCalls(actorCircuitBreakerProperties.getMinimumNumberOfCalls())
                .failureRateThreshold(actorCircuitBreakerProperties.getFailureRateThreshold())
                .slowCallRateThreshold(actorCircuitBreakerProperties.getSlowCallRateThreshold())
                .slowCallDurationThreshold(Duration.ofMillis(actorCircuitBreakerProperties.getSlowCallDurationThreshold()))
                .recordException(isNotActorNotFoundException)
                .build();
        TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom()
                .timeoutDuration(Duration.ofMillis(actorCircuitBreakerProperties.getTimeLimiterTimeoutDuration()))
                .build();
        return factory -> factory.configure(builder -> builder.circuitBreakerConfig(circuitBreakerConfig)
                .timeLimiterConfig(timeLimiterConfig)
                .build(), CB_ACTOR_CLIENT);
}

and in application.properties configured contextPropagator:

resilience4j.thread-pool bulkhead.instances.cb_actor_client.contextPropagator[0]=com.x.circuitbreaker.CustomContextPropagator

and using CB like:

            var result = actorClient.query(
                    SearchActors.builder()
                            .filter(filter)                            
                            .build());

            return mapActorSearchResults(result.getFragments().getActorSearchResults())
   }, actorSearchFallback.fallback(ActorClientApiCalls.SEARCH_ACTORS));

When I debug, there is no call to any of the methods in propagator. I need to pass SecurityContext in the propagator. Thank you!

vidhangithub commented 1 year ago

Hi @RobWin . We were also facing this issue and after applying below config it resolved our issue.

resilience4j.scheduled.executor: corePoolSize: 10 contextPropagators:

But I have doubt/question regarding 'corePoolSize' property of above configuration. Will it by pass thread-pool-bulkhead coreThreadPoolSize, which I have defined for resilience4j.thread-pool-bulkhead as below?

resilience4j.thread-pool-bulkhead: instances: my-bulk-head: maxThreadPoolSize: 50 coreThreadPoolSize: 20 queueCapacity: 20 keepAliveDuration: 2s contextPropagators:

Thanks in advance.

RobWin commented 1 year ago

Yes, If a call must be retried for some reason or times out, we unfortunately cannot use the thread pool bulkhead to schedule the retry attempt or timeout exception.