clj-commons / aleph

Asynchronous streaming communication for Clojure - web server, web client, and raw TCP/UDP
http://aleph.io
MIT License
2.54k stars 241 forks source link

InputStream as passed a body gets closed by different thread when client closes connection prematurely #535

Open mikroskeem opened 4 years ago

mikroskeem commented 4 years ago

Reproduction is pretty simple: 1) Get yourself an InputStream (in my case, one from Minio SDK when getting an object from the storage - which originates from OkHttp) 2) Pass it into response body {:body my-is} 3) Make client close connection prematurely (uh-oh, binary file being downloaded with curl into the terminal - by default curl does not allow that) 4) Aleph closes InputStream in different thread

I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):

{host 127.0.0.1:7000, user-agent curl/7.68.0, accept */*}
read3       Thread[manifold-wait-6,5,main]
available   Thread[manifold-wait-6,5,main]
read3       Thread[manifold-wait-6,5,main]
available   Thread[manifold-wait-6,5,main]
read3       Thread[manifold-wait-6,5,main]
read3       Thread[manifold-wait-6,5,main]
available   Thread[manifold-wait-6,5,main]
read3       Thread[manifold-wait-6,5,main]
close       Thread[aleph-netty-server-event-pool-23,5,main]
Feb 16, 2020 8:45:12 PM manifold.utils invoke
SEVERE: error in invoke-callbacks
java.lang.IllegalStateException: Unbalanced enter/exit
    at okio.AsyncTimeout.enter(AsyncTimeout.java:73)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:235)
    at okio.RealBufferedSource.read(RealBufferedSource.java:51)
    at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:374)
    at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:418)
    at okhttp3.internal.Util.skipAll(Util.java:204)
    at okhttp3.internal.Util.discard(Util.java:186)
    at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:435)
    at okio.RealBufferedSource.close(RealBufferedSource.java:476)
    at okio.RealBufferedSource$1.close(RealBufferedSource.java:460)
    at java.base/jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:438)
    at cu.resourcepack_server.core$fn__28582$fn__28595$fn__28605.invoke(form-init15695820198716781853.clj:36)
    at cu.resourcepack_server.core.proxy$java.io.InputStream$ff19274a.close(Unknown Source)
    at aleph.http.core$send_streaming_body$fn__15872.invoke(core.clj:324)
    at manifold.utils$invoke_callbacks$fn__1143.invoke(utils.clj:69)
    at manifold.utils$invoke_callbacks.invokeStatic(utils.clj:68)
    at manifold.utils$invoke_callbacks.invoke(utils.clj:65)
    at aleph.netty.ChannelSink.markClosed(netty.clj:344)
    at aleph.netty.ChannelSink.close(netty.clj:352)
    at aleph.netty$sink$fn__15376.invoke(netty.clj:407)
    at manifold.deferred$eval1788$chain_SINGLEQUOTE____1809.invoke(deferred.clj:749)
    at manifold.deferred$eval1788$subscribe__1789$fn__1794.invoke(deferred.clj:715)
    at manifold.deferred.Listener.onSuccess(deferred.clj:219)
    at manifold.deferred.Deferred$fn__1634.invoke(deferred.clj:398)
    at manifold.deferred.Deferred.success(deferred.clj:398)
    at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
    at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
    at aleph.netty$wrap_future$reify__15320.operationComplete(netty.clj:218)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at manifold.executor$thread_factory$reify__1009$f__1010.invoke(executor.clj:47)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:834)

available   Thread[manifold-wait-6,5,main]
read3       Thread[manifold-wait-6,5,main]
close       Thread[manifold-wait-6,5,main]
close       Thread[manifold-wait-6,5,main]

Body + proxy class which delegates to the original stream (for debugging):

           {:status 200
            :body (proxy [java.io.InputStream] []
                    (read
                      ([]
                       (a/put! log-ch (str "read0\t\t" (Thread/currentThread)))
                       (.read stream))
                      ([^bytes b]
                       (a/put! log-ch (str "read1\t\t" (Thread/currentThread)))
                       (.read stream b))
                      ([^bytes b off len]
                       (a/put! log-ch (str "read3\t\t" (Thread/currentThread)))
                       (.read stream b off len)))
                    (available []
                      (a/put! log-ch (str "available\t" (Thread/currentThread)))
                      (.available stream))                    
                    (skip [^long l]
                      (a/put! log-ch (str "skip\t\t" (Thread/currentThread)))
                      (.skip stream l))
                    (close []
                      (a/put! log-ch (str "close\t\t" (Thread/currentThread)))
                      (.close stream)))
            :headers {"Content-Type" "application/zip"}}

Also simple logging snippet which works ok with the test scenario:

(def log-ch
  (a/chan))

(a/go-loop [m (a/<! log-ch)]
  (when m
    (println m)
    (recur (a/<! log-ch))))

I think that this is not right

kachayev commented 3 years ago

Similar issue was described in #454. It's unclear if Netty provides a good way of dealing with the situation like this, at least I didn't find any (the issue in Netty repo is open since 2017).

mikroskeem commented 3 years ago

So it starts from the Netty... bummer.

kachayev commented 3 years ago

There are multiple things happening here. As far as InputStream has inherently blocking API, the code that fills-in data from network and the code that reads it have to run on different threads. I think an exception thrown on async thread pool could be captured and propagated back to InputStream thread to be re-thrown there... but it would require some jiggling. I'm testing a few improvements to error handling, will see if I can cover this specific use case properly. Thanks for the report!