confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
66 stars 123 forks source link

Allow customization of the ThreadPoolExecutor #78

Open mauricioszabo opened 3 years ago

mauricioszabo commented 3 years ago

The code here: https://github.com/confluentinc/parallel-consumer/blob/411c925633cda63abb69e17b54ddb90a2c37afd2/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java#L206 creates a factory with some defaults, specially the default thread factory.

The problem is that on Clojure, libraries are not precompiled as .class files, so some Java classes and interfaces that are emitter from Clojure (for example, protocols and records) will be generated while the program is running - but, starting from Java 9, some thread factories will not share the classloader and so these classes will not be visible on the workers threads of the consumer. This makes this library hard, or even impossible, to use with Clojure.

I'll try to make an example code of this issue and link on this issue, so maybe it'll become easier to debug and even see if the problem is corrected. In the meantime, these are related problems that I found while debugging my code:

Exactly the same problem I'm having: https://stackoverflow.com/questions/55452778/parallelstream-causing-classnotfoundexception-with-jaxb-api

Work around that I'm trying to apply here: https://stackoverflow.com/questions/49113207/completablefuture-forkjoinpool-set-class-loader/57551188#57551188

astubbs commented 3 years ago

Hello again @mauricioszabo ! I saw this comment: https://github.com/technomancy/leiningen/issues/2729#issuecomment-784392623 - so is this actually not an issue?

mauricioszabo commented 3 years ago

Him @astubbs. Maybe for my project, indeed it is not :sweat_smile:. I did try to check if by not using AOT (Ahead of Time compiling in Clojure) I was able to reproduce the error, but no luck.

I don't know if this could be an issue for other people, considering what I saw on StackOverflow that people do customize ThreadPoolExecutor to make their code work with libraries that use reflection. For Clojure, it seems that it's not - I'm not quite sure but as I'm being unable to reproduce the error, maybe it's really not an issue.... :thinking:

astubbs commented 3 years ago

Ah ok, I see. Well, how does something like this look? https://github.com/confluentinc/parallel-consumer/compare/master...astubbs:custom-thread-pool