clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Navigating the stream graph upstream #85

Open dm3 opened 8 years ago

dm3 commented 8 years ago

Would it make sense to add an manifold.stream/upstream analogous to already existing manifold.stream/downstream? I don't think it would be needed for any functionality currently, but would be great for graph navigation purposes.

In our use-case we need to reconstruct stream topology starting from the sink. It seems limiting being able to get the graph from the source but not from the sink.

Would you accept such a PR?

ztellman commented 8 years ago

The reason this doesn't already exist is because it would introduce memory leaks unless it is very carefully designed. I've been poking at something in this vein, but don't have a timeline for release. I would be unlikely to accept a PR unless it came with a convincing argument for why no memory leaks would result.

If you can give me more detail about your use case, though, I may be able to suggest an alternative. On Thu, Jul 14, 2016 at 1:09 AM Vadim Platonov notifications@github.com wrote:

Would it make sense to add an upstream analogous to already existing downstream? I don't think it would be needed for any functionality currently, but would be great for graph navigation purposes.

In our use-case we need to reconstruct stream topology starting from the sink. It seems limiting being able to get the graph from the source but not from the sink.

Would you accept such a PR?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ztellman/manifold/issues/85, or mute the thread https://github.com/notifications/unsubscribe/AAB6P2x83ULdEGA9_o77Qnuurk5nddQ0ks5qVe7GgaJpZM4JMLrz .

dm3 commented 8 years ago

Actually, I wrote the following thing as a crutch:

(defn- unwrap [stream]
  (loop [s stream]
    (cond (instance? manifold.stream.Callback s)
          (recur (first (.downstream s)))

          (instance? manifold.stream.SourceProxy s)
          (recur (.source s))

          (instance? manifold.stream.SinkProxy s)
          (recur (.sink s))

          :else s)))

(defn upstream [sink]
  (let [sink (unwrap sink)
        is-sink? (fn [^manifold.stream.graph.Downstream d]
                   (= (unwrap (.sink d)) sink))
        source #(when (some is-sink? (val %))
                  (when-let [k (.get (key %))]
                    [k]))]
    (->> (.entrySet manifold.stream.graph/handle->downstreams)
         (mapcat source))))

which serves our purpose. This will produce inconsistent results if the streams are connected/disconnected concurrently but we have our topology finalized by the time we need this.

EDIT: some unwrapping needed to make the upstream above work for non-trivial cases.

EDIT2: relying on the graph being preserved through all the transformations is error-prone. Consider stream/zip which doesn't connect the input streams to the output stream, but rather relies on the deferred/loop and deferred/zip to do its work.