Closed akhileshchg closed 2 months ago
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
@akhileshchg are you able to resolve the issue with this change?
Nice work, this code is so twisty, not a nice introductory task at all!
As Mayank says, could you add some details on how this fixes the problem please.
It looks like
The exception caught by InputStream.close() isn't an IOException because it's coming from the get of the delegate, at https://github.com/confluentinc/kafka-rest/blob/1683233421407a61353db7c869c8420441530012/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java#L68 rather than the InputStream.close (which does only throw an IOException).
The problem is only seen on a totally unused Stream, because the delegate.get() call that fails, because the delegate hasn't been set up yet for the unused stream, can successfully return once the stream has been used.
Is that right?
Nice work, this code is so twisty, not a nice introductory task at all!
As Mayank says, could you add some details on how this fixes the problem please.
It looks like
- https://github.com/confluentinc/kafka-rest/blob/1683233421407a61353db7c869c8420441530012/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java#L200 is throwing on close of the InputStream, so the close of the outbound stream, never happens, and the @injected ProduceAction.produce() never gets chance to complete. And so the Context level closing, that would tidy up the KafkaProducer never occurs. Is that right?
- The exception caught by InputStream.close() isn't an IOException because it's coming from the get of the delegate, at https://github.com/confluentinc/kafka-rest/blob/1683233421407a61353db7c869c8420441530012/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java#L68 rather than the InputStream.close (which does only throw an IOException).
- The problem is only seen on a totally unused Stream, because the delegate.get() call that fails, because the delegate hasn't been set up yet for the unused stream, can successfully return once the stream has been used.
Is that right?
Yes. You're right.
@ehumber & @akhileshchg thanks for writing down the detailed break down of how thread leak happens.
…clean up failures.
InpuStream close just handles the IOException. But when the there are unused streams Jersey throws BadRequestException instead of IOException. This uncaught exception disrupts the outbound response closure leaving the producer thread behind.