leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
651 stars 26 forks source link

Lazify `ap` #109

Open leonoel opened 5 months ago

leonoel commented 5 months ago

ap should evaluate synchronous continuations on transfer.

Desired semantics

When an ap evaluation context resumes, i.e. a ? task terminates or a ?>/?< flow is ready to transfer, the continuation is evaluated eagerly if and only if it has a branch containing an asynchronous operator (? or ?>). Otherwise, the process becomes ready to transfer and the continuation is run lazily on transfer.

Motivations

ap's current evaluation rules make some patterns inherently unsafe due to possible data losses.

An example of such a use case is the problem discussed in this slack thread https://clojurians.slack.com/archives/CL85MBPEF/p1713457348286889 - partition an input flow in batches constrained by both a maximum batch size and a maximum delay between the first and the last element of the batch.

The proposed solution, while functional and elegant, has a subtle issue.

(defn batch [max-size max-delay input]
  (m/ap (let [[_ input] (m/?> (m/group-by {} input))]
          (m/? (m/reduce conj
                 (m/eduction (take-while (complement #{::timeout})) (take max-size)
                   (m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))

An input value may be skipped when the scheduler thread races with the thread producing this value. The data loss can be observed consistently with this test summing the successive batch sizes :

((m/reduce (fn [n b] (+ n (count b))) 0
   (batch 4 15
     (m/ap
       (let [x (m/?> (m/seed (range 10000)))]
         (m/? (m/via m/blk (Thread/sleep (rand 10)) x))))))
 prn prn)
;; eventually prints a number inferior to 10000

The data loss occurs in the inner ap, under the following scenario :

  1. T1 (scheduler thread) resolves the sleep, ap is ready to transfer ::timeout, the eduction stage is notified, ap is transferred immediately and the transducer pipeline terminates due to take-while.
  2. T2 (producer thread) makes a new value available on the group consumer and wins the race against T1 trying to cancel ap. The internal output buffer is empty, so the group consumer is transferred immediately, ap is now ready to transfer the value and the eduction stage is notified again.
  3. T1 cancels ap and flushes remaining values. The group consumer is now cancelled, but the first value of the next batch has already been transferred to the internal output buffer and will therefore be discarded.

What should have happened instead :

Accidental benefits

If ap evaluates ?< lazily then it's strictly more powerful than cp. Therefore, cp can be deprecated.

Chesterton's fence

The current evaluation semantics are mainly a consequence of cloroutine's design. Cloroutine doesn't expose any information about the continuation, so the only possible way to figure out if the final result can be computed synchronously is to actually try to compute it.

Implementation strategy

Unknown.

awwx commented 5 months ago

I was curious if this might be related (or not)...

(defn take-none [f]
  (m/eduction (take 0) f))

(defn echo [f]
  (m/ap
   (let [v (m/?> f)]
     (prn 'produce v)
     v)))

(m/?
 (m/reduce (fn [_ _]) nil
   (take-none (echo (m/seed (range))))))
produce 0
produce 1

The echo/seed combination gets two values ahead of the consumer before being cancelled.

Now let's add in another layer:

(defn copy [f]
  (m/ap (m/?> f)))

... (take-none (copy (echo (m/seed (range)))))
produce 0
produce 1
produce 2
... (take-none (copy (copy (echo (m/seed (range))))))
produce 0
produce 1
produce 2
produce 3

Each added copy buffers an additional value, so each copy we add causes the producer to get an additional value ahead of the consumer.

Suppose this issue was implemented. In copy, the branch doesn't contain an asynchronous operator (it doesn't do anything but return), so the continuation would be evaluated lazily... and would that mean that copy would no longer buffer an additional value in a pipeline like it does now?

leonoel commented 5 months ago

yes, when this issue is solved copy will be semantically equivalent to identity.