clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

If the thread that is waiting at `@(s/take! some-stream)` is cancelled the next value to stream is lost #149

Closed jarppe closed 6 years ago

jarppe commented 6 years ago

If I have a thread that is waiting for value from dereffred and that thread is interrupted, the next value to the stream is lost.

I try to give a minimal example that shows this:

(let [s (s/stream* {:buffer-size 0})
      f (future
          (let [d (s/take! s)]
            (println "f waiting...")
            (println "f got:" @d)))]

  ; make sure f has got the deferred and is de-reffing it:
  (Thread/sleep 10)
  (future-cancel f)

  (future
    (loop []
      (when-let [v @(s/take! s)]
        (println "got:" v)
        (recur)))
    (println "exited"))

  (future
    (dotimes [n 3]
      (println "putting" n "->" @(s/put! s n))))

  (Thread/sleep 100)
  (s/close! s)
  (println "done"))

This prints:

f waiting...
putting 0 -> true
putting 1 -> true
got: 1
putting 2 -> true
got: 2
done
exited

The first future gets the referred from s/take! and is waiting a value from it, but that future is cancelled.

Next I send values 0, 1, and 2 to the stream. The future taking and printing values from that streams receives 1 and 2.

Is this expected or perhaps I have misunderstood something.

I got this problem when I created a set of workers using an Executor, and in cleanup I just called (.shutdownNow executor) that causes all worker threads to be interrupted. After that I recreated some worker threads and noticed that the first items sent to stream were not processed by those workers.

ztellman commented 6 years ago

It's possible that the code is not robust to interrupt being called on the thread. I'll take a look.

ztellman commented 6 years ago

Sorry for the delay, but I've looked at your example more closely and this seems to be working as expected. By calling take!, you've claimed the head of the stream, even if you fail to do anything with the deferred. To actually cancel the consumption of the first element in the stream, you have to fill the deferred with another value before a message comes through.