clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 108 forks source link

Inconsistent behaviour? #35

Closed danjohansson closed 9 years ago

danjohansson commented 9 years ago

I have some problem detecting when to close the output stream in the example below. I do not know if it is ok to close the file in the on-drained callback. I noticed different behaviour when using stream and source as described in the code below.

Whats the recommended pattern for writing a stream down to a file as attempted here?

(require '[byte-streams :as bs])
(require '[clojure.java.io :as io])
(require '[manifold.deferred :as d])
(require '[manifold.stream :as s])

(defn bug? [s]
  (let [out (io/output-stream "c:/tmp/f3.txt")]
    (->>
     s
     (s/map bs/to-byte-array)
     ((fn [s] (s/on-drained s 
                            (fn [] 
                              (println "2") (flush)
                              (.flush out) (.close out)
                              ))
        s))
     ((fn [source]
        (s/consume (fn [v] 
                     (.write out v)
                     (when (s/drained? source)
                       (println "1") (flush)
                       (.flush out) (.close out)
                       )) source))))))

;; case 1 prints: "2"
(let [s (s/stream)]
  (s/put! s "test")
  (bug? s)
  (s/close! s))

;; case 2 prints: "2" and "1" and then exception cause of writing to closed file
(let [s  (-> (byte-array [1 2 3])
             (bs/to-byte-buffers)
             (s/->source))]
  (bug? s)
  (s/close! s))
ztellman commented 9 years ago

on-drained is called once the stream has been drained of messages, which is not necessarily the same as when all of its messages have been fully processed downstream (how would the stream know when you're done with its contents?). Checking whether it's drained in the consume callback won't always work, since it might be still open when it receives its last message, and only be closed subsequently, which won't pass anything to the callback.

Short answer is: use (byte-streams/transfer s (io/file ...)). Longer answer is that you need to use a loop to consume the messages one by one, and only do the resource closing once you read from the source, get the "drained" value, and then clean up.

danjohansson commented 9 years ago

Thanks for clearing things up! Also thanks for adding the new documention on custom execution models.