illuin-tech / data-pipeline

Toolkit for describing data transformation pipelines by compositing simple reusable components.
MIT License
5 stars 0 forks source link

Concurrency: investigate using ThreadPoolExecutor.CallerRunsPolicy as a default for resilience components #27

Open eledhwen opened 7 months ago

eledhwen commented 7 months ago

Several resilience4j rely on one or several thread-pools (via ServiceExecutor) for workload management. For instance the time-limiter will submit workloads to a thread-pool, and monitor their execution time. If they go over the time-limit, the future is cancelled (or let go) and the wrapper will throw a TimeLimiterException so as to let another workload to be processed.

It can happen that the rate at which we subject the thread-pool to new workloads outgrows the rate at which it completes and/or cancels them. When that happens, the ServiceExecutor will reject the workload, and proceed according to a policy specified by a RejectedExecutionHandler.

The default behaviour for all ServiceExecutor is to use the ThreadPoolExecutor.AbortPolicy implementation of this handler, which will throw a RejectedExecutionException. But in our case it could be interesting to at least offer the option of choosing the ThreadPoolExecutor.CallerRunsPolicy, which will make it so the thread handling the ServiceExecutor will run the workload itself (and thus, it will block submissions temporarily, possibly helping with managing workload pressure).

eledhwen commented 7 months ago

This scenario has been encountered in a project with a time-limiter put in front of a fairly active Kafka topic.

It has been circumvented by doing the following (so it's already possible to tweak the behaviour):

//Apart from the policy, this is identical to what one would get with Executors.newFixedThreadPool() 
var executor = new ThreadPoolExecutor(
  parallelismFactor,
  parallelismFactor,
  config.fastLane().executorParallelism(),
  0L, TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<>(),
  new ThreadPoolExecutor.CallerRunsPolicy()
);

//[...]
new TimeLimiterWrapper<>(timeLimiterConfig, executor);