clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

`on-closed` not working on sources? #100

Closed vspinu closed 8 years ago

vspinu commented 8 years ago

I am getting the following with manifold "0.1.5"

(def tt (s/periodically 200 (constantly 1)))
(s/on-closed tt #(prn "closed"))
;; ClassCastException manifold.stream.SourceProxy cannot be cast to manifold.stream.core.IEventSink  coinbot.gdax.feed/eval31658 (form-init5390501534687401845.clj:53)
ztellman commented 8 years ago

Sinks are closed, sources are drained. You probably want to use 'on-drained' if this situation. On Wed, Aug 31, 2016 at 11:07 AM Vitalie Spinu notifications@github.com wrote:

I am getting the following with manifold "0.1.5"

(def tt (s/periodically 200 (constantly 1))) (s/on-closed tt #(prn "closed"));; ClassCastException manifold.stream.SourceProxy cannot be cast to manifold.stream.core.IEventSink coinbot.gdax.feed/eval31658 (form-init5390501534687401845.clj:53)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ztellman/manifold/issues/100, or mute the thread https://github.com/notifications/unsubscribe-auth/AAB6PxkRphYctDMdCu17upm9yqZ1wTEbks5qlaH4gaJpZM4Jxw8p .

vspinu commented 8 years ago

on-drained doesn't work when the source is closed. I can do s/close! on a source so presumably the source can be closed and it is also automatically closed on error (this is why I need an on-close method).

The following doesn't print anything:

  (def tt (s/periodically 200 (constantly 1)))
  (s/on-drained tt #(prn "drained"))
  (s/close! tt)

The printing representation of a source has :closed? true which again suggests that sources are closable. But (s/closed?) doesn't work on sources:

> tt
;; << source: {:pending-puts 1, :drained? false, :buffer-size 1, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 1, :source? true} >>
> (s/closed? tt)
;; Unhandled java.lang.ClassCastException
;;  manifold.stream.SourceProxy cannot be cast to manifold.stream.core.IEventSink
ztellman commented 8 years ago

You haven't actually consumed the contents of tt, so it hasn't been drained. There's a slight asymmetry, as you point out, since you're able to close! a source, but the idea is that it shouldn't matter to you when a source has been closed, only when it doesn't have any more messages to produce.

vspinu commented 8 years ago

You haven't actually consumed the contents of tt, so it hasn't been drained.

I see now. The source can be closed but still consumed up to the buffer limit. The following indeed works:

  (def ts (s/periodically 1000 #(do (prn "here") 1)))
  (s/on-drained ts #(prn "drained"))
  (s/close! ts)
  ;; can still take! twice
  (s/take! ts)
  (s/take! ts) ; => "drained"

Looks like the buffer of periodically is 2. That's why I can still take! two values at the end.

but the idea is that it shouldn't matter to you when a source has been closed, only when it doesn't have any more messages to produce.

This now makes perfect sense. Thanks.