clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

manifold.stream.graph/handle->downstreams size always grow #169

Open defclass opened 5 years ago

defclass commented 5 years ago

Hello Zach, I am not sure I encounter a memory leak problem. In my use case, we want to connect to server, and send some packet repeatly by aleph websocket implement. After connected, use two stream to transform data.

(defn- wrap-websocket-stream
  [s]
  (let [out (s/stream)]
    (s/connect (s/map json/generate-string out) s)
    (s/splice out (parse-json-stream s))))

We found var size of manifold.stream.graph/handle->downstreams will always grow.

Also, can repreduce the case to use below code.

(dotimes [i 5] 
  (let [a (s/stream)
       b (s/stream)]
   (s/connect a b)
   @(s/put! a "a")
   (s/close! b)
   (s/close! a)
   (prn (.size manifold.stream.graph/handle->downstreams))))

It's entirely possible I'm using stream incorrectly, thanks.

ztellman commented 5 years ago

This is because the pseudo-buffer inside b (which will hold onto 16384 messages that won't fit in the actual buffer) is still holding onto the message even after the stream is closed. If you add a (s/take! b) after the put!, you'll see everything clean itself up as expected.

I'm not sure if this is actually the problem you're having, as usually streams in Aleph ground out in a socket that won't exhibit this behavior, but I still think it's confusing and probably incorrect. I'm pushing a change that will fix the sample code you've provided, I'd be interested to know if it fixes the memory leak you're seeing in production.

EDIT: it seems like the timing the current behavior is relied upon in some tests, I need to dig deeper to see if that's incidental, or there's a reason for this behavior I've managed to forget. I'll update once I have more to share.

defclass commented 5 years ago

Thanks for reply.

If you add a (s/take! b) after the put!, you'll see everything clean itself up as expected.

Yes , it is the case. We use aleph as the client to do some monitor tests, which we don't need to handle the incoming message, just send data after connected, then close the connection. I mistakenly take it for granted , which invoking s/close! not only close the stream , but also will clean up all the resource.

Not sure if it is reasonable that manifold provide some explicit function to release resource of specific stream.