clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Stream pipeline correctly connected by take! hangs #147

Open lxsameer opened 6 years ago

lxsameer commented 6 years ago

Hi First of all kudos. I have several components which everyone of them has an input stream and an output stream. I connected my input and output of components like:

input of component A connected to output of component A. output of component A connected to input of component B. input of B connected to output of B output of B connected to input of C and finally input of C connected to output of C.

I confirmed this by walking the pipeline using downstream function. here is the output of the walk:

STREAM: manifold.stream.default.Stream@2db12c5b  ->  manifold.stream.default.Stream@5572f9d1

STREAM: manifold.stream.default.Stream@5572f9d1  ->  manifold.stream.default.Stream@656ed510

STREAM: manifold.stream.default.Stream@656ed510  ->  manifold.stream.default.Stream@8d5cdb0

STREAM: manifold.stream.default.Stream@8d5cdb0  ->  manifold.stream.default.Stream@46a7703a

STREAM: manifold.stream.default.Stream@46a7703a  ->  manifold.stream.default.Stream@60399d13

But the problem is that when I put! something in any of these streams, and try-take! the value in either the stream itself or the downstream, take! hangs out and try-take! returns the timeout value. But the put! return value derefs to true.

My component functions are fairly simple:

  (fn [component]
    ;; cpmponent is a map.
    (let [input  (hcomp/input component)
          output (hcomp/output component)]
      (println "INPUT: " (str input))
      (println "OUTPUT: " (str output))
      (stream/connect input output)
      component))

If i remove the call to connect function in the function ( not create a pipeline ) I can take a value from the same stream which i put the value in.

NOTE: I tried to debug this issue in latest manifold version. I found out that the producers in https://github.com/ztellman/manifold/blob/master/src/manifold/stream/default.clj#L204 is empty for the downstream stream. My guess is that put method of stream returns true but it does not actually put the value in the producers. But i didn't confirm this guess.

ztellman commented 6 years ago

I just tried to reproduce your issue based on your description:

> (def a (s/stream))

> (def b (s/stream))

> (def c (s/stream))

> (s/put-all! a [1 2 3])
<< … >>

> (s/connect a b)
nil

> (s/connect b c)
nil

> (->> c s/stream->seq (take 3))
(1 2 3)

This seems to work as expected. Can you expand on how my example differs from what you're doing?

lxsameer commented 6 years ago

Thanks for the quick response. Some friend in clojurians pointed out to me that once I connected several streams together. I only can consume messages from the final output. So in my example I only can consume from C's output which works just fine. So it was my bad and my misunderstanding :pray: . But it would be a good idea to update a docs with a note about this.

dm3 commented 6 years ago

Hey, I was the one who answered the question.

@lxsameer - would be interesting to hear how you thought the streams worked and what your initial design was based on your mental model. Probably you aren't the only one making the same assumptions. Will be easier to clarify the documentation after we clear this up.

lxsameer commented 6 years ago

I read the docs on aleph.io many times and my assumption was that when you connect a series of streams to each other, you still can inspect each stream along the way. For example if I connect A -> B -> C. Then by putting some value in A I should be able to take that from B as well without effecting the pipeline ( I still should be able to get it from C too ). Because in my use case I connect the components input and output based on a workflow graph dynamically.

ztellman commented 6 years ago

If you connect B to multiple streams, the messages will be sent to both, but all operators in manifold.stream operate via side effects. If I'm understanding what you're saying, you seem to be expecting them to work like Clojure's seqs, which don't disappear when transformed elsewhere.

If things did work this way, then each stream in your software would effectively hold onto every message forever, eventually leading to a memory leak. This happens fairly often with large lazy sequences, where you mistakenly hold onto the head of the sequence. This is why Manifold mimics the semantics of core.async, Java queues, and other similar abstractions by removing messages once they're consumed.

lxsameer commented 6 years ago

Thanks @ztellman @dm3 . I totally understand the flow right now. But I suggest to add a small note to the docs to help future users understand the concept better and don't make the same mistake as I did.