leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

DOC: RxJava comparison #32

Closed xificurC closed 3 years ago

xificurC commented 3 years ago

This is a WIP, sharing with you so that you can comment and let me know if you like what you see and want me to finish walking through the guide.

leonoel commented 3 years ago

Nice job ! I also have a WIP Rx guide but it looks more like a cheatsheet, I think it could be merged to yours with little overlap. A few remarks :

xificurC commented 3 years ago

Thanks! Feel free to plug in your cheatsheet somewhere.

I rebased and force pushed some more changes, taking into consideration your feedback.

xificurC commented 3 years ago

I finished the InfoQ guide in the last commit, but have some pending questions:

(m/? (m/aggregate (fn [_ _] (println (java.util.Date.))) nil (m/transform (take 20) clock)))
#inst "2021-01-04T15:10:13.728-00:00"
#inst "2021-01-04T15:10:14.728-00:00"
#inst "2021-01-04T15:10:15.730-00:00"
#inst "2021-01-04T15:10:16.731-00:00"
#inst "2021-01-04T15:10:17.732-00:00"
#inst "2021-01-04T15:10:18.734-00:00"
#inst "2021-01-04T15:10:19.735-00:00"
#inst "2021-01-04T15:10:20.736-00:00"
#inst "2021-01-04T15:10:22.729-00:00"
#inst "2021-01-04T15:10:25.729-00:00"
#inst "2021-01-04T15:10:28.731-00:00"
#inst "2021-01-04T15:10:31.731-00:00"
#inst "2021-01-04T15:10:34.732-00:00"
#inst "2021-01-04T15:10:36.739-00:00"
#inst "2021-01-04T15:10:37.740-00:00"
#inst "2021-01-04T15:10:38.741-00:00"
#inst "2021-01-04T15:10:39.741-00:00"
#inst "2021-01-04T15:10:40.742-00:00"
#inst "2021-01-04T15:10:41.743-00:00"
#inst "2021-01-04T15:10:42.744-00:00"
xificurC commented 3 years ago

I have found the RxJava readme provides additional interesting examples that could be compared, so I went ahead and included those as well. At this point there's a lot of content to look at so I'll stop and wait for you to catch up and correct my mistakes.

leonoel commented 3 years ago

I defined an interval function but it doesn't seem to do what I expect. Pulling 3 values from a 1-second-producing flow always takes 2 seconds.

Your interval implementation emits the first tick immediately, that's why pulling 3 items from a 1-second interval takes 2 seconds. Here's how you could fix it :

(defn interval [ms]
    (m/ap
      (loop []
        (if (m/?? (m/enumerate [true false]))
          (m/? (m/sleep ms :emit)) (recur)))))

Or another approach I would consider more idiomatic :

(defn interval [ms]
    (m/ap (m/? (m/sleep (m/?? (m/enumerate (repeat ms))) :emit))))

Note that both implementations are fixed-delay clocks, which means they can drift over time. If you want a fixed-rate clock (non-drifting), you could implement it like that :

(defn interval-fixed-rate [ms]
    (m/ap (m/? (m/sleep (- (m/?? (m/enumerate (next (iterate (partial + ms) (System/currentTimeMillis)))))
                           (System/currentTimeMillis)) :emit))))

I'm not sure what kind of clock Rx interval is.

I didn't know how to emit no values in an ap block in the :done branch. I used (m/?? m/none), is that correct? Is there a better way?

(m/?? m/none) is the idiomatic way now but when amb operators will be released (amb?) will be better. Feel free to refer to amb operators in your guide, I'm pretty sure they will be in the next release.

xificurC commented 3 years ago

Hi Leo, thank you for your update.

The interval from RxJava is indeed a fixed-delay clock. After subscribing it can have an initial wait time after which it produces a new value every ms. Interestingly it generates a value every time, disregarding backpressure. So your solution isn't exactly the same but suffices for the comparison.

I fail to understand one thing from your fixed-rate clock - if I (def sec (interval-fixed-rate 1000)) and e.g. take 5 values, then do nothing in the REPL for 5 seconds and take 5 more values I still wait 1 second per each new value. If I'm reading the code correctly (iterate (partial + ms) (System/currentTimeMillis)) should be a lazy sequence of X,X+1s,X+2s,X+3s,.. where X=(System/currentTimeMillis). If I waited 5 secs in the REPL the next value from that lazy sequence should produce a sleep with a negative value. Obviously I'm misreading something, because the end result isn't so.

Feel free to refer to amb operators in your guide, I'm pretty sure they will be in the next release.

I'll update the docs when you release them. I'm not sure what names will you pick in the end :)

xificurC commented 3 years ago

The InfoQ comments section links to another guide that looks really well written. That would take way too much of my time to walk through though.

leonoel commented 3 years ago

The interval from RxJava is indeed a fixed-delay clock.

After a quick look at the source, it turns out the implementation clearly exhibits fixed-rate behavior, thus the documentation is misguiding.

Interestingly it generates a value every time, disregarding backpressure. So your solution isn't exactly the same but suffices for the comparison.

That's right. For completeness we should add a relieve stage immediately after the interval.

I fail to understand one thing from your fixed-rate clock - if I (def sec (interval-fixed-rate 1000)) and e.g. take 5 values, then do nothing in the REPL for 5 seconds and take 5 more values I still wait 1 second per each new value. If I'm reading the code correctly (iterate (partial + ms) (System/currentTimeMillis)) should be a lazy sequence of X,X+1s,X+2s,X+3s,.. where X=(System/currentTimeMillis). If I waited 5 secs in the REPL the next value from that lazy sequence should produce a sleep with a negative value. Obviously I'm misreading something, because the end result isn't so.

Your understanding is correct. If downstream can't keep up, the interval clock will start to drift and decrease sleep duration in order to keep in sync with input rate. If it's more than a cycle late, sleep will get a negative value. That's OK, the effective sleep duration will be limited to a minimal value, host-dependent but always positive (unfortunately !).

xificurC commented 3 years ago

Sorry for the long pause, I got a bit too busy at work.

I mistyped, the documentation showed it to be a fixed-rate clock. Apologies.

Your understanding is correct. If downstream can't keep up, the interval clock will start to drift and decrease sleep duration in order to keep in sync with input rate.

What am I doing wrong then?

(defn interval [ms]
  (m/ap (m/? (m/sleep (- (m/?? (m/enumerate (next (iterate (partial + ms) (System/currentTimeMillis)))))
                         (System/currentTimeMillis)) :emit))))

(let [secs (interval 1000)]
  (m/? (m/aggregate (fn [_ _] (println (java.util.Date.))) nil (m/transform (take 5) secs)))
  (println :sleep-for-5s)
  (Thread/sleep 5000)
  (m/? (m/aggregate (fn [_ _] (println (java.util.Date.))) nil (m/transform (take 5) secs))))
#inst "2021-03-03T16:28:58.595-00:00"
#inst "2021-03-03T16:28:59.594-00:00"
#inst "2021-03-03T16:29:00.595-00:00"
#inst "2021-03-03T16:29:01.594-00:00"
#inst "2021-03-03T16:29:02.594-00:00"
:sleep-for-5s
#inst "2021-03-03T16:29:08.597-00:00"
#inst "2021-03-03T16:29:09.597-00:00"
#inst "2021-03-03T16:29:10.598-00:00"
#inst "2021-03-03T16:29:11.597-00:00"
#inst "2021-03-03T16:29:12.597-00:00"
xificurC commented 3 years ago

I thought I'll be able to debug this myself but failed to do so :)

(ns missionary.test (:require [missionary.core :as m]))
(defmacro dbg {:style/indent 1} [label x] `(let [label# ~label x# ~x] (println label# x#) x#))
(defn interval [ms]
  (m/ap
   (m/?
    (m/sleep
     (dbg :diff
       (- (dbg :next
            (m/?? (m/enumerate (next (iterate (partial + ms) (System/currentTimeMillis))))))
          (dbg :curr
            (System/currentTimeMillis)))) :emit))))
(time
 (let [flow (interval 100)]
   (println '---)
   (dbg :got (m/? (m/aggregate conj nil (m/transform (take 2) flow))))
   (let [ms 200] (println "***" :sleep ms "***") (Thread/sleep ms))
   (dbg :got (m/? (m/aggregate conj nil (m/transform (take 2) flow))))))

prints

---
:next 1614891930110
:curr 1614891930011
:diff 99
:next 1614891930210
:curr 1614891930111
:diff 99
:next 1614891930310
:curr 1614891930211
:diff 99
:got (:emit :emit)
*** :sleep 200 ***
:next 1614891930513
:curr 1614891930413
:diff 100
:next 1614891930613
:curr 1614891930514
:diff 99
:next 1614891930713
:curr 1614891930614
:diff 99
:got (:emit :emit)
"Elapsed time: 605.126679 msecs"

My expectations are failing in 2 ways:

leonoel commented 3 years ago

there are 3 prints for each (take 2)

This is consistent with the execution model.

the :next after the sleep is as if the flow was just starting

Flow composition is referentially transparent. The flow returned by (interval 100) is a blueprint, it has no identity, a fresh process will be respawned each time the flow is run. As far as I understand Rx works the same way, this problem is often described as "hot vs cold".

What you're observing is what I would expect. If that behavior is confusing to you, feel free to suggest better ways to explain it.

xificurC commented 3 years ago

Hi Leo,

thank you again for the explanation, I always feel a bit dumb I only get it once you explain it in detail, even though it's not that complicated.

What I didn't understand was why is the third value's computation initialized when take only wants 2 values. So I tried to write the whole thing out, using a single flow only:

(defn ms [] (System/currentTimeMillis))
(time (do (println '---)
          (let [secs (interval 100)
                q (java.util.concurrent.LinkedBlockingQueue. 10)
                flow (secs #(.put q :ready) #(println :die))]                 ;(1)
            (dotimes [_ 2] (when (.take q) (dbg [:got (ms)] @flow)))          ;(2)
            (let [ms 200] (println "***" :sleep ms "***") (Thread/sleep ms))  ;(3)
            (dotimes [_ 2] (when (.take q) (dbg [:got (ms)] @flow))))))       ;(4)

The output of this is

---
:next 1615236298617
:curr 1615236298517
:diff 100
:next 1615236298717
:curr 1615236298619
:diff 98
[:got 1615236298618] :emit
:next 1615236298817
:curr 1615236298718
:diff 99
[:got 1615236298717] :emit
*** :sleep 200 ***
:next 1615236298917
:curr 1615236298919
:diff -2
[:got 1615236298919] :emit
:next 1615236299017
:curr 1615236298920
:diff 97
[:got 1615236298920] :emit
"Elapsed time: 404.252384 msecs"

After writing this out I understand why there were 3 prints for (take 2).

With this settled I'll be able to look at the guide again and fill in any missing pieces. I will let you know once I think it's ready.

xificurC commented 3 years ago

Hi Leo,

I added a note about RxJava Intervals not doing backpressure. Apart from that I found no obvious issues with the tutorial, so I consider it finished and leave it up to you to merge or request further changes. Thanks for all the help!

leonoel commented 3 years ago

Well, I did not expect you to write that much and I really like what you came up with. I don't see anything to change, it makes sense for me to publish it as is. Feel free to add your name as single author, I'll publish my cheatsheet later as a separate document. Thank you again for the hard work !

xificurC commented 3 years ago

Thank you for the kind words. It was a nice way to learn about the API and internals. I don't want my name in the tutorial, if anyone cares they can check it in the commits. Thanks for your help while writing it, it was fun!