clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Failure when using transform & transducers that accumulate values #143

Closed wneirynck closed 3 years ago

wneirynck commented 6 years ago

I have a stream that contains strings. These strings are actually multiline events, separated by empty lines. So I've created transducer that accumulates all values until it encounters an empty string, and then emits the accumated result before resetting:

(defn join-non-empty
  "Creates a transducer that joins all consecutive non-empty strings in the sequence into one."
  [rf]
  (let [iv (volatile! "")]
    (fn
      ([] (rf))
      ([r]
       (let [c @iv]
         (if (empty? c)
           (rf r)
           (rf r c))))
      ([r i]
       (let [c @iv]
         (if (empty? i)
           ;; Empty string encountered: emit the accumulated result and reset
           (do
             (vreset! iv "")
             (rf r c))
           ;; Non-empty string: accumulate
           (do
             (vreset! iv (str c i))
             ;; Bug in Manifold: fails on regular nil return
             (d/future nil))))))))

The last line is required otherwise an exception occurs:

11:00:39.788 ERROR [graph]: error in message propagation java.lang.ClassCastException: java.lang.String cannot be cast to java.util.concurrent.Future at clojure.core$deref_future.invokeStatic(core.clj:2206) at clojure.core$deref.invokeStatic(core.clj:2228) at clojure.core$deref.invoke(core.clj:2214) at manifold.stream.graph$sync_connect$f24875auto__26024$fn26025.invoke(graph.clj:253) at manifold.stream.graph$sync_connect$f24875auto__26024.invoke(graph.clj:252) at clojure.lang.AFn.run(AFn.java:22) at io.aleph.dirigiste.Executor$3.run(Executor.java:308) at io.aleph.dirigiste.Executor$Worker$1.run(Executor.java:62) at manifold.executor$thread_factory$reify24757$f__24758.invoke(executor.clj:36) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)

When using the transducer (without the d/future line) in a regular sequence, it works without problems.

Example of failing code:`

(manifold.stream/transform join-non-empty ["a" "b" ""])

This code works:

(sequence join-non-empty ["a" "b" "" "c" "d"])
wneirynck commented 6 years ago

After some more investigation, I've found that the issue also dissappears if I return r instead of the future on the last line of the 2-arity transducer function. The transducer specs do not really specify that this is the required behaviour, so that may be confusing. So you can either just drop this issue, or make the Manifold code more robust to handle nil returns.

KingMob commented 3 years ago

The transducer docs could be clearer, but yes, the 2-arity form is the main reduction and should always return the accumulated result so far. See the dedupe, distinct, and partition-by transducers for similar examples.

In this case, failing to return the unchanged result only appears to be working because the output is totally independent of the accumulated result so far.

Given that this is an invalid transducer, I'm going to close the issue. FWIW, I can't replicate the bug on Manifold 0.1.9-alpha4 and Java 15, so hopefully it's not still a problem, regardless.