typesafehub / akka-contrib-extra

ConductR Akka contributions
Other
9 stars 16 forks source link

InputStreamPublisher blocks such that it may not be shutdown #24

Closed huntc closed 9 years ago

huntc commented 9 years ago

InputStreamPublisher blocks its actor thread when reading from a stream. This causes a problem when the actor system is shutdown as the publisher will never receive its stop message.

The following JVM dump illustrates the thread being blocked. The dump was taken once system.shutdown() and then system.awaitTermination() had been called.

"conductr-akka.actor.default-dispatcher-15" #24 prio=5 os_prio=31 tid=0x00007fddb1af8000 nid=0x6503 runnable [0x0000000115cb1000]
   java.lang.Thread.State: RUNNABLE
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:234)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    - locked <0x00000007b84491b8> (a java.lang.UNIXProcess$ProcessPipeInputStream)
    at java.io.FilterInputStream.read(FilterInputStream.java:107)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply$mcI$sp(InputStreamPublisher.scala:66)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply(InputStreamPublisher.scala:66)
    at akka.contrib.stream.InputStreamPublisher$$anonfun$1.apply(InputStreamPublisher.scala:66)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
    at scala.concurrent.package$.blocking(package.scala:123)
    at akka.contrib.stream.InputStreamPublisher.read(InputStreamPublisher.scala:66)
viktorklang commented 9 years ago

I suggest doing the reading using something like the following: https://gist.github.com/viktorklang/5409467

Actors shouldn't block. This means that you can interrupt currently running async&blocking reads.

huntc commented 9 years ago

Can you successfully interrupt a blocking IO read? Where is that documented? Thanks.

http://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html#read(byte[])

viktorklang commented 9 years ago

Now that is an excellent question @huntc, I thought that was appropriately documented on InputStream, but of course it cannot make such guarantees itself, (see: http://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html#read()), and surprisingly it isn't even mentioned in http://docs.oracle.com/javase/7/docs/api/java/io/FileInputStream.html#read(),

Having jumped into the rabbit hole, I have now found myself in a 14 year old bug report: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4514257

So, it does seem like interrupting is not possible, however, one may be able to call close() on the InputStream: https://stackoverflow.com/questions/3843363/thread-interrupt-not-ending-blocking-call-on-input-stream-read

huntc commented 9 years ago

Thought of that too. :-) Unfortunately, the close method on an InputStream makes no guarantees of thread safety; thus it should not be called on a separate thread to read.

viktorklang commented 9 years ago

@huntc It's better to call and wrap in a try-catch than to let Threads hang, I suppose?

huntc commented 9 years ago

Still no guarantees though...

viktorklang commented 9 years ago

@huntc There are no guarantees that the hardware will work either.

viktorklang commented 9 years ago

FileChannel implements InterruptibleChannel: http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html

hseeberger commented 9 years ago

Hmm ... FileChannel looks useless here. Maybe implement AbstractInterruptibleChannel.

viktorklang commented 9 years ago

…ProviderFactoryBeanServiceVisitor