confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
689 stars 117 forks source link

ReactorProcessor executes a user-provided function in a thread from the pc-pool rather than in the provided scheduler #793

Closed yevheniisemenov closed 2 weeks ago

yevheniisemenov commented 2 weeks ago

As far as I understand, when I use ReactorProcessor, it's expected that the user-provided function will be executed in a scheduler that I provide during initialization. However, it actually executes in a thread from the pc-pool pool.

For example, this code

ReactorProcessor<Object, Object> processor = new ReactorProcessor<>(options, Schedulers::boundedElastic);

processor.react(poolContext -> {
    System.out.println(Thread.currentThread().getName() + ": hello");
    return Mono.fromRunnable(() -> System.out.println(Thread.currentThread().getName() + ": world"));
});

prints

pc-pool-2-thread-1: hello
boundedElastic-1: world

while the expected (as for me) result should be

boundedElastic-1: hello
boundedElastic-1: world

This means that with the current implementation, it's possible for users to block threads from the pc-pool, which may not be clear or expected behavior.

Also, if we dive deeper into the react implementation, we can see that it has multiple thread switches: the first by .subscribeOn and the second by .publishOn.

https://github.com/confluentinc/parallel-consumer/blob/1819a8bbfb227243aaa8732bc1eb0740bb476621/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java#L98-L109

However, .subscribeOn only works for the reactive part of the user-defined function, which is why the first print from my example is executed in a pc-pool thread.

As for .publishOn, it switches threads again for the doOnNext and onComplete parts, which seems unnecessary to me - we just spent CPU on a thread switch here.

To fix this problem, we need to move carefullyRun into the reactive context and remove publishOn. This should be enough.

I can provide a PR later today if you agree with my proposal.

Thanks!

rkolesnev commented 2 weeks ago

Hi @yevheniisemenov,

Thanks for detailed explanation of the issue and PR with a fix. I am not very well versed in usage of Reactor framework so the explanation of the problem is very helpful.

Cheers

yevheniisemenov commented 2 weeks ago

@rkolesnev, thanks for merging! I had a follow-up question: when can I expect to see this commit in the next release? Thanks!

rkolesnev commented 2 weeks ago

I am actually preparing the release - your PR got included last minute as I thought will rather included it now than wait for next release. Should be released today or this week at the latest.

yevheniisemenov commented 2 weeks ago

Sounds great, thank you!

rkolesnev commented 2 weeks ago

Fix merged, closing the issue.