clj-commons / manifold

A compatibility layer for event-driven abstractions
1.01k stars 106 forks source link

Upstream/downstream closing behaviors unaware of sibling streams #235

Closed yenda closed 7 months ago

yenda commented 8 months ago

Is this a bug with stream/batch?

With the following topology:

(do
  (defn st []
    (let [s (stream/periodically 500 (constantly 2))
          s1 (stream/stream)
          sb (stream/stream)
          s2 (stream/stream)
          s3 (stream/stream)
          s4 (stream/stream)]
      (stream/connect sb s1)
      (stream/connect sb s2)
      (stream/connect (->>  s
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s->sb v) v)))
                      sb)
      (stream/connect (->>  s2
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s2->s3 v) v)))
                      s3)
      (stream/connect (->> s1
                           (stream/map (fn [v] (println :s1->s4 v) v)))
                      s4)
      (stream/connect (->> s3
                           (stream/map (fn [v] (println :s3->s4 v) v)))
                      s4)
      (stream/consume (fn [v] (println :s4 v) v) s4)

      [s sb s1]))
  (def ss (st))
  ss)

(stream/close! (first ss))

Here is the result

:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s2->s3 [[2 2] [2 2]]
:s3->s4 [[2 2] [2 2]]
:s4 [[2 2] [2 2]]
:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s2->s3 [[2 2]]
:s3->s4 [[2 2]] ;; s4 never consumes the last [[2 2]]

I would expect s4 to get the last [[2 2]] before closing but it doesn't

It looks like the outcome depends on whether s is closed at a point where s3 is mid-batch or not?

:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s2->s3 [[2 2] [2 2]]
:s3->s4 [[2 2] [2 2]]
:s4 [[2 2] [2 2]]
:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s->sb [2]
:s1->s4 [2]
:s4 [2]
:s2->s3 [[2 2] [2]]
:s3->s4 [[2 2] [2]]
:s4 [[2 2] [2]] ;; s4 drained before closing
yenda commented 8 months ago

Another possible outcome:

:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s->sb [2 2]
:s1->s4 [2 2]
:s4 [2 2]
:s2->s3 [[2 2] [2 2]]
:s3->s4 [[2 2] [2 2]]
:s4 [[2 2] [2 2]]
:s->sb [2]
:s1->s4 [2]
:s4 [2]
:s2->s3 [[2]]
:s3->s4 [[2]]
KingMob commented 8 months ago

Flushing what it has so far when closed is usually how Manifold operates. The alternative is losing data by default.

I think if you want to force it to drop partial batches when closing, you'll need custom code. Either adapt a core partition* fn to be a transducer that drops incomplete batches (unlike partition-all), or use a callback with consume-via that only emits full batches.

I can take a closer look when I have time, but it might be a while; I need to focus on paying work.

yenda commented 8 months ago

Flushing what it has so far when closed is usually how Manifold operates. The alternative is losing data by default.

I'm not sure I understand, are you saying that it is indeed a bug and Manifold is expected to flush and not just lose the data?

I think if you want to force it to drop partial batches when closing, you'll need custom code. Either adapt a core partition* fn to be a transducer that drops incomplete batches (unlike partition-all), or use a callback with consume-via that only emits full batches.

I want the opposite, I want it to always flush, sending partial batches or batches, but not losing data. Right now it does lose data and is not flushing in my first output shown above. It does flush sometimes but there's cases where it doesn't which the repro above shows.

So I guess it is a bug indeed

yenda commented 8 months ago

@KingMob here is my hacky way of reproducing this every time

(do
  (defn st []
    (let [cnt (atom 0)
          s (stream/periodically 500 (fn [] (swap! cnt inc)))
          s1 (stream/stream)
          sb (stream/stream)
          s2 (stream/stream)
          s3 (stream/stream)
          s4 (stream/stream)]
      (stream/connect sb s1)
      (stream/connect sb s2)
      (stream/connect (->>  s
                            (stream/map (fn [v] (when (= 5 @cnt) (stream/close! s)) v))
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s->sb v) v)))
                      sb)
      (stream/connect (->>  s2
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s2->s3 v) v)))
                      s3)
      (stream/connect (->> s1
                           (stream/map (fn [v] (println :s1->s3 v) v)))
                      s3)
      (stream/consume (fn [v] (println :s3 v) v) s3)

      (println "====================")
      [s sb s1]))
  (def ss (st))
  ss)
yenda commented 8 months ago

Afaict only def-sink+source is implementing a graceful close that drains before closing. Print debugging shows that there is nothing to drain (.isEmpty acc) is true for all the stream/sink that are being closed.

KingMob commented 8 months ago

Ahh, I think I understand you now.

It seems your concern is not about partial batches, but that println's didn't seem to print, which you take to mean data was lost in the stream.

But, that's not how closing/draining works. If you look at the docs for connect, you'll see the downstream? parameter. When set to true, which is the default, closing a source will close its downstream sinks. So, as the close of the initial s propagates downwards, the final stream is closed by one of its parents, blocking the other from writing to it.

Try using {:downstream? false} with connect in that case, and be sure to close it manually when you're done.

yenda commented 8 months ago

@KingMob ok thanks for the feedback, dowstream false is indeed how I worked around the issue, and I was suspecting I'd have to implement a manual count of upstream channels to be able to close properly

yenda commented 8 months ago

upstream? | Whether closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to false

downstream? | Whether closing the source will close the sink

This is what the doc says. The upstream? param tells you it is closing even if there are other sinks downstream, the downstream? param doesn't tell you if is closing even if there are other sources upstream.

It is an issue that only materializes when you have more than one source AND an imbalance in the puts. I don't think it is a sane default, it loses data in a way that upstream? doesn't, because if you close the upstream sources you are cutting off the messages from every downstream at once. If you close downstream when a downstream has more than one upstream at the end of the execution it would have derived messages from less input than other streams that branched out earlier (basically what happens in my repro).

As a conclusion I would still consider this an issue, even if the doc is improved to reflect the behavior more clearly, the behavior makes its harder to use the api. If it was only closing downstream when upstream is the last open upstream, one could easily have the current behavior by using on-closed on the upstream, while in the current API, you can only obtain what I think is a more desirable behavior by maintaining a counter of opened upstream an close downstream when it reaches 0.

KingMob commented 7 months ago

It is an issue that only materializes when you have more than one source AND an imbalance in the puts.

FYI, it would happen even if puts were perfectly balanced, because sibling streams don't know about each other. One of them could always close their shared sink before the other gets a chance to write its final message. This is due to the topology, and not batch or any property of the message distribution. It's easier to see the issue when batching, but the race exists regardless.

it loses data in a way that upstream? doesn't

upstream? true has similar undesirable consequences when one source has multiple sinks, though. If one sink closes, it cuts off a sibling sink from its data, even if the sibling wanted the upstream source to continue.

As a conclusion I would still consider this an issue, even if the doc is improved to reflect the behavior more clearly, the behavior makes its harder to use the api.

PRs welcome. You can't change any default behavior without breaking stuff, but it might be possible to add keywords that change how downstream?/upstream? are interpreted. (Though going upstream may be trickier.) Doc improvements also welcome. I'll update the issue name, and reopen it.

in the current API, you can only obtain what I think is a more desirable behavior by maintaining a counter

You shouldn't need a counter, btw. Fns like drain-into, connect-via, and consume-async will provide deferreds that return when the source is done. (drain-into is essentially a connect that returns a closing deferred) You can then use deferred/zip and deferred/alt on those deferreds to get desired any-closed/all-closed behaviors.

yenda commented 7 months ago

You shouldn't need a counter, btw. Fns like drain-into, connect-via, and consume-async will provide deferreds that return when the source is done. (drain-into is essentially a connect that returns a closing deferred) You can then use deferred/zip and deferred/alt on those deferreds to get desired any-closed/all-closed behaviors.

Thank you! I didn't think of using the deferred at all and it solves my issue nicely.

Eg for the repro above:

(do
  (defn st []
    (let [cnt (atom 0)
          s (stream/periodically 500 (fn [] (swap! cnt inc)))
          s1 (stream/stream)
          sb (stream/stream)
          s2 (stream/stream)
          s3 (stream/stream)
          s4 (stream/stream)]

      (stream/connect sb s1)
      (stream/connect sb s2)

      (stream/connect (->>  s
                            (stream/map (fn [v] (when (= 5 @cnt) (stream/close! s)) v))
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s->sb v) v)))
                      sb)

      (d/chain (d/zip' (stream/drain-into (->>  s2
                            (stream/batch 2 1000)
                            (stream/map (fn [v] (println :s2->s3 v) v))) s3)
                             (stream/drain-into (->> s1
                           (stream/map (fn [v] (println :s1->s3 v) v))) s3))
                     #(do (stream/close! s1)
                          (stream/close! s2)))

      (stream/consume (fn [v] (println :s3 v) v) s3)

      (println "====================")
      [s sb s1]))
  (def ss (st))
  ss)

Always yields the correct result