clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

BlockingQueueSource should be closable #176

Open SevereOverfl0w opened 5 years ago

SevereOverfl0w commented 5 years ago
(def a (s/->source (java.util.concurrent.LinkedBlockingQueue. [1 2 3])))
(def b (s/batch 100 1 a))

(s/close! a)
(s/description a)
;; {:type "java.util.concurrent.LinkedBlockingQueue", :buffer-size 0, :source? true}
(s/description b)
;; {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true}

I guess this is because the queue is not closable. Would be nice to have the downstreams disconnect in this case though.

KingMob commented 3 years ago

Unless overridden, the default impl of .close does nothing, and it's not currently overridden for BlockingQueueSource. I agree the "source-wrapped" version should be closable, though, and prevent further takes.

EDIT 2023-2-28: Oops, slosing should prevent puts, not takes. Takes can happen until drained

cosineblast commented 1 year ago

This doesn't seem to be a particular issue with BlockingQueueSource honestly.

(defn foo []
  (let [a (s/stream 4)
        b (s/batch 100 1 a)]

    (s/put-all! a [1 2 3 4])

    (s/close! a)

    (println (s/description b))))

This produces exactly the same behavior, using a regular stream.

However, by removing the (s/put-all! a [1 2 3 4]) call, b is closed when a is closed. I believe the reason for that is because when there's nothing to take from a, the pending takes implied by batch end up being stored in a's consumers, and then the close! call resolves those consumers with their default values, allowing the graph to close downstream, in this case, b. This doesn't happen when the take!s are resolved by put!s since the consumers are properly resolved and not reported to the graph.

KingMob commented 1 year ago

There's a bit of a race condition here, though. Behind the scenes, multiple threads are being used, so it's completely possible for the description of b to be printed out before the closing of a has fully propagated its effects to b. I ran that code snippet multiple times, and got different results.

In general, ordering around closing can get a little weird. Unfortunately, the current default close! behavior doesn't return a deferred we could wait on...should probably fix that.