kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Proposal to add `fromProperty` static function #280

Closed ivan-kleshnin closed 6 years ago

ivan-kleshnin commented 6 years ago

I find myself wanting to throttle/debounce properties quite often. One example where it seems absolutely necessary – SSR. You need to wait until your state somehow stabillizes to make a snapshot of it for rendering.

1) state$.throttle() / state$.debounce() don't fit because they pass initial value through a time-based gateways. I mentioned that here https://github.com/kefirjs/kefir/issues/278

2) state$.changes().throttle(), etc. don't fit because they lose initial value completely.

I propose to add the following function to the standard toolkit:

function fromProperty(prop$) {
  return K.merge([K.later(0, prop$.take(1)).flatMap(), prop$.skip(1)])
}

Demo case:

let K = require("kefir")

let makeStore = (seed, action$) => {
  return action$
    .scan((z, fn) => fn(z), seed)
    .skipDuplicates()
}

let action$ = K.later(250, z => z + 1) // also try: K.never()

let state$ = makeStore(1, action$)

function fromProperty(prop$) {
  return K.merge([K.later(0, prop$.take(1)).flatMap(), prop$.skip(1)])
}

fromProperty(state$)
  .throttle(500, {leading: false}) // ---2| or ---1|
  .log("state$") // both async and initial values are throttled / debounced properly
mAAdhaTTah commented 6 years ago

I'd have to think about this a bit more, but it certainly seems a bit weird to me that you wouldn't want to get the current value immediately. Then it's not really a "Property" because the whole point of a Property is its current value. If you debounce its current value, it's not really "current" anymore.

FWIW, the implementation of that can probably be simplified by using withHandler instead of creating all those child streams to combine everything.

mAAdhaTTah commented 6 years ago

(I'm assuming a lot about your use case, so please correct me where I'm wrong.)

Thinking about this more, if you're determining that you state has stabilized as a factor of time, it feels a bit Doing It Wrongβ„’. Expressing it as "state has stabilized after receiving no updates after Xms" doesn't capture the relationship between a "stable state" & whatever other tasks are responsible for stabilizing it. You're expressing the relationship implicitly through time, rather than directly through the streams themselves.

I would approach the issue one of two ways (I'm assuming your approach is Redux-like, which I have a decent amount of experience in):

  1. Express the dependency directly via flatMap and friends. When all of them end, emit the single value that results.
  2. Express the dependency through the state itself, e.g. if you have 4 tasks that need to finish before emitting the state, then having state.tasks.competed = 0 and incrementing that as they complete, then doing something like:

    state$.filter(state => state.tasks.completed >= 4).take(1)

    would express that as well.

Thoughts?

ivan-kleshnin commented 6 years ago

Thanks for the discussion! πŸ‘

Then it's not really a "Property" because the whole point of a Property is its current value. If you debounce its current value, it's not really "current" anymore.

The function is called fromProperty, not toProperty. You say "I'm losing the features of property" and yes, that's exactly my goal. What I'm getting of this function is just a stateful stream (vs property) which is affected by time. Such streams are at the core of RxJS and MostJS and it's hard to argue they all are doing it wrong.

Thinking about this more, if you're determining that you state has stabilized as a factor of time, it feels a bit Doing It Wrongβ„’. Expressing it as "state has stabilized after receiving no updates after Xms" doesn't capture the relationship between a "stable state" & whatever other tasks are responsible for stabilizing it. You're expressing the relationship implicitly through time, rather than directly through the streams themselves.

I don't think it's necessary wrong. It's just the worst case where you don't have better metrics to depend upon. Think of a scraping script. It should stop when it visited all links on all pages. You can't or won't like to have an event for "links are ended" because negative events (like "something DIDN'T happen", the abscense of events) are pretty tough to express (or emulate) in general.

So you will stop this scraper on some debounce event – that will be simple and work pretty well (to my experience wich such systems). I write a lot of reactive code which is not SPA and that's, at least, my point of view.

For the record, my SSR looks like

 K.fromProperty(sinks.state$)
      .skipWhile(hasLoadingApps)
      .merge(timeoutError(1000))
      .take(1)
      .takeErrors(1)

– almost identical to the way you described it... The thing is – if I want to add:

let finalState$ = K.fromProperty(sinks.state$)
+     .throttle(50)
      .skipWhile(hasLoadingApps)
      .merge(timeoutError(1000))
      .take(1)
      .takeErrors(1)

I need to have fromProperty there.

I would approach the issue one of two ways (I'm assuming your approach is Redux-like, which I have a decent amount of experience in):

state$.filter(state => state.tasks.completed >= 4).take(1)

This code will pick the initial state unless tasks are started synchronously. Or you should start with some "blocker" task – but WHO will unblock it and WHEN?! Then simple solutions end.

I know there are methods to couple the system in a way it will be 100% declarative. The thing is – sometimes declarative (reactive) approach is the worst of the alternatives, not the best. You can think of this case that I weighed pros and cons and decided in favor of throttle and simplicity against declarativity. My decision may not be the best, of course, but it's rational.

P.S.

Imperative code describes the sequence of commands the best – that's why we have do syntax in Haskell. Writing that in a series of lambdas (its desugaring result) looks and reads absolutely terrible.

Kinda the same happens with reactive code in some cases I can't yet describe formally but I certainly experienced that. For example, try to express something like this:

state.set("loading.x", true)
let x = await fetch(f(q))
state.set("loading.x", false)

state.set("loading.y", true)
let y = await fetch(g(x))
state.set("loading.y", false)

state.set("loading.z", true)
let z = await fetch(g(y))
state.set("loading.z", false)

in a reactive manner. I did that and the results aren't pretty (despite being reactive). And if you try to isolate side-effects like here https://cycle.js.org/api/http.html it will tear your code apart. Given this, I'm even starting to reevaluate the position "Reactive is better by default" for myself.

Macil commented 6 years ago

Think of a scraping script. It should stop when it visited all links on all pages. You can't or won't like to have an event for "links are ended" because negative events (like "something DIDN'T happen", the abscense of events) are pretty hard or hairy to express or emulate in general.

Kefir's end events fit that pretty well, as long as all of your changes are being delivered by streams that emit some values and then end when they're done. If you merge multiple streams (or flatMap one stream into many that are then merged together), then the merged stream doesn't end until all of its contained streams have ended. (Sorry if you're familiar with all of this and this is a bit of a tangent. I'm just a little surprised because I think that scenario is actually a great example of the strength of a more purely reactive solution, and it makes me suspicious that there are good solutions with the existing Kefir API to your main situation.) Here's an example of a mock scraping script that makes use of end events. If the scraping script instead ended by a debounce event or some kind of timer, then the script could end early if a single fetch took too long, or the script would wait unnecessarily long before determining that it's finished. This strategy here also has the bonus that unsubscribing from the stream returned by fetchRecursively automatically causes the scraping to end and even for the in-progress fetches to abort themselves if the fetch function is coded to abort on unsubscription.

This code will pick the initial state unless tasks are started synchronously. Or you should start with some "blocker" task – but WHO will unblock it and WHEN?! Then simple solutions end.

If you have a startTasks() function that returns a 1-item stream that emits once the tasks are started, then you could do this:

startTasks().flatMap(() => state$).filter(state => state.tasks.completed >= state.tasks.started).take(1)

About fromProperty specifically as proposed: it feels a bit like a hack in that it delays the first event in an arbitrary hard-coded way unlike other built-in Kefir functions. I don't expect to see setTimeout/later called in any Kefir functions that aren't specifically timing related. And doing it without the delay instead seems like a big change since as far as I know, Kefir chooses to treat synchronously-emitted values and a property's current value as the same concept. Maybe there could be an option to timing-related functions like throttle and debounce to make them not pass current events through immediately, but that's still pretty dirty (do all timing functions get that option?) and I'm not convinced at the moment that even if that feature were present that it would be the best solution to your situation.

ivan-kleshnin commented 6 years ago

@AgentME thanks, I'll consider your suggestions.

What can you say about the reactive version of the following:

state.set("loading.x", true)
let x = await fetch(f(q))
state.set("x", x)
state.set("loading.x", false)

state.set("loading.y", true)
let y = await fetch(g(x))
state.set("y", y)
state.set("loading.y", false)

state.set("loading.z", true)
let z = await fetch(g(y))
state.set("z", z)
state.set("loading.z", false)

I think it's pretty obvious what this code is meant to do so I refrain from over-exlaining. All my attempts to make a short and readable implementation of this failed.

A cut version:

let fetchXStart$ = // a condition to start
let fetchXEnd$ = fetchXStart$.flatMapConcat(_ => K.fromPromise(
  A.get(`/api/.../`)
  .then(resp => resp.data[...])
  .catch(R.id)
)),

let fetchYStart$ = // a condition to start
let fetchYEnd$ = fetchYStart$.flatMapConcat(_ => K.fromPromise(
  A.get(`/api/.../`)
  .then(resp => resp.data[...])
  .catch(R.id)
)),

let fetchZStart$ = // a condition to start
let fetchZEnd$ = fetchZStart$.flatMapConcat(_ => K.fromPromise(
  A.get(`/api/.../`)
  .then(resp => resp.data[...])
  .catch(R.id)
)),

let action$ = K.merge([
  fetchXStart$.map(_ => s => setLoading(s)),
  fetchYStart$.map(_ => s => setLoading(s)),
  fetchZStart$.map(_ => s => setLoading(s)),

  fetchXEnd$.map(_ => s => unsetLoading(s)),
  fetchYEnd$.map(_ => s => unsetLoading(s)),
  fetchZEnd$.map(_ => s => unsetLoading(s)),

  fetchXEnd$.map(d => s => setData(d, s)),
  fetchYEnd$.map(d => s => setData(d, s)),
  fetchZEnd$.map(d => s => setData(d, s)),
])

Now imagine someone explaining to their colleague how reactivity helps wich such example... Imperative one was WAY more readable.

mAAdhaTTah commented 6 years ago

The function is called fromProperty, not toProperty. You say "I'm losing the features of property" and yes, that's exactly my goal. What I'm getting of this function is just a stateful stream (vs property) which is affected by time. Such streams are at the core of RxJS and MostJS and it's hard to argue they all are doing it wrong.

Fair enough; as a minor nit, I probably wouldn't call it fromProperty, as I don't see that as descriptive, but that's beside the point. FWIW, MostJS doesn't have the concept of properties by default (they have an external package called @most/hold that would produce something similar). Quick testing suggests debouncing RxJS's BehaviorSubject would debounce its initial value, so there is that.


As for your imperative example, there are a couple things: First, in your reactive version, you're error handling the Promise but not doing so in the imperative version as well as doing some data manipulation on said promise result. There's also three separate start streams, whereas the imperative version is presuming those things are happening in order. So the comparison is not so apples-to-apples. I'd probably do it a bit more like this (assuming we can emit functions to set state):

let start$ = // a condition to start
const action$ = start$.flatMap(() => K.concat([
    K.constant(s => setLoading(s)),
    K.fromPromise(A.get('/api/...')).map(d => s => setData(d, s)),
    K.constant(s => unsetLoading(s)),
    K.constant(s => setLoading(s)),
    K.fromPromise(A.get('/api/...')).map(d => s => setData(d, s)),
    K.constant(s => unsetLoading(s)),
    K.constant(s => setLoading(s)),
    K.fromPromise(A.get('/api/...')).map(d => s => setData(d, s)),
    K.constant(s => unsetLoading(s)),
]));

which actually reads pretty closely to the imperative version. There are some error-handling niceities you get doing this way, but it's ultimately not that different. I followed your initial reactive example, but we could also pass setLoading directly, rather than in an arrow, and if we curried setData, we could do the same thing there, making this whole thing point-free:

let start$ = // a condition to start
const action$ = start$.flatMap(() => K.concat([
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
]));

which I like a lot.

Whether it's advantageous to write it this way vs imperative depends more on the overall architecture of your application. A lot of Observables' power comes from an "Observables all the way down" mindset; if you're just using them here and there, you're probably going to find the burden not worth it.


All of that said, we're getting away a bit from the salient question: Should the current event of a Property be affected by time-based operators? If we say yes, that's 100% a breaking change, so we'd have to slate it for a hypothetical 4.0. I strongly dislike muddying up the API itself to provide an option to time-based operators, nor would I be in favor of adding a method to do this conversion, as it feels sloppy and unprincipled to say "we think this should emit synchronously, but if you don't, we'll give you a way out".

However, I introduced the thru method (and am working on the pipeline operator) for situations exactly like this; you could easily write a function to the effect of:

const toTimeableProperty = s$ => s$.withHandler((emitter, event) => {
    switch (event.type) {
        case 'value':
            if (event.current) {
                setTimeout(emitter.value, 0, event.value)
            } else {
                emitter.value(event.value)
            }
            break;
        default:
            emitter.emitEvent(event);
            break;
    }
})
    .setName(s$, 'toTimeableProperty');

const state$ = // create state Property

// and use it thus:
state$.thru(toTimeableProperty).debounce(200)

So the behavior you want is definitely implementable in userland, so I'm even less inclined to think we need to include this in Kefir proper.


As another aside, I'm not really in love with how Cycle.js handles side effects. Losing the connection between input & output is pretty weird. I generally handle side effects by flatMap'ing over Observables that do the side effect, so the connection between input and output is retained. I think they do a lot of interesting things, but that's pretty core to how the framework functions and I think is a bit of a failing. You get this nice pure function at the cost of readability because you can't actually tell what happens to the request once it gets sent because it's handled elsewhere.

ivan-kleshnin commented 6 years ago

As another aside, I'm not really in love with how Cycle.js handles side effects. Losing the connection between input & output is pretty weird. I generally handle side effects by flatMap'ing over Observables that do the side effect, so the connection between input and output is retained. I think they do a lot of interesting things, but that's pretty core to how the framework functions and I think is a bit of a failing. You get this nice pure function at the cost of readability because you can't actually tell what happens to the request once it gets sent because it's handled elsewhere.

I agree πŸ’― percent. The initial promise of reactivity to me was to SEE causally related things in the same file, on the neighboring lines of code. In CycleJS the cause and the effect are separated too far too often which kinda breaks the point.

All of that said, we're getting away a bit from the salient question: Should the current event of a Property be affected by time-based operators?

I didn't say I want a timeable property in this thread. I wanted a way to convert Kefir's property to a stream with initial value being handled differently than in changes. Anyway, as we all come with different working solutions, and you guys were against the idea of having such method in the library, I guess I can close the issue. I have no problem with keeping it in the app.

let start$ = // a condition to start
const action$ = start$.flatMap(() => K.concat([
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
]));

which I like a lot.

Hmm, I also like this layout! Need to give it a go. Thank you!

mAAdhaTTah commented 6 years ago

I didn't say I want a timeable property in this thread. I wanted a way to convert Kefir's property to a stream with initial value being handled differently than in changes.

No, I know, and I understand the ask; I just don't think we should expose that in Kefir's API directly, but I do think it's worth exploring the more fundamental question, especially in light of RxJS's BehaviorSubject. I'm assuming you prefer Property's default behavior?


In CycleJS the cause and the effect are separated too far too often which kinda breaks the point.

Yeah, definitely prefer redux-observable over Cycle.js here. I did something similar in brookjs with Kefir as well, plus some things with React, redux, & Kefir, if you're interested.

ivan-kleshnin commented 6 years ago

I'm assuming you prefer Property's default behavior?

Yes, totally!

I did something similar in brookjs with Kefir as well, plus some things with React, redux, & Kefir, if you're interested.

I have to confess I hate everything about Redux. I like to dispatch functions, not actions. The latter feels very OOP-ish to me (the close set of behaviors/actions/methods).

mAAdhaTTah commented 6 years ago

I like to dispatch functions, not actions.

Hah, to each their own; this feels uncontrolled to me, vs centralizing all the logic in the reducer.

The latter feels very OOP-ish to me (the close set of behaviors/actions/methods).

This coupling is part of what I'm trying to solve. I don't love actions being coupled to the change they're supposed to induce, so I split them conceptually into "Events" (what just happened) & "Commands" (what should happen). If the reducer responds to Events but returns a [State, Command] tuple, then you get to centralize all the logic in (essentially) one giant pure function.

ivan-kleshnin commented 6 years ago

This coupling is part of what I'm trying to solve. I don't love actions being coupled to the change they're supposed to induce, so I split them conceptually into "Events" (what just happened) & "Commands" (what should happen). If the reducer responds to Events but returns a [State, Command] tuple, then you get to centralize all the logic in (essentially) one giant pure function.

Sound interesting, I need to take a closer look at your project despite my prejudice against Redux.

let start$ = // a condition to start
const action$ = start$.flatMap(() => K.concat([
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
    K.constant(setLoading),
    K.fromPromise(A.get('/api/...')).map(setData),
    K.constant(unsetLoading),
]));

I've tried this one and I think it doesn't have the features of the original code. It provides the same sequence, yes, but it does not provide the data dependency I wanted:

let x = await fetch(f(q))
...
let y = await fetch(g(x)) // !! depends on x

The only way K.fromPromise(A.get('/api/...')).map(setData), and the next K.fromPromise(A.get('/api/...')).map(setData) can communicate in this version is through the state.

And that's not always a valid option. For example

Request-1: fetch all model ids on some index page Request-2: fetch those models by ids

In the second fetch you don't know WHICH ids were fetched unless you pass them explicitly between requests (like in both my versions). You can not use state because result ids are merged with ids from the previous requests and so are unrecognizable.

The second, similar problem is error passing. In your version I can shortcut the whole sequence with error event, but I can not handle event in the next handler (by catching), unless I put this event in the state and mark it in some unique way to make it recognizable for a particular command which is totally unrealistic i.m.o.

mAAdhaTTah commented 6 years ago

The only way K.fromPromise(A.get('/api/...')).map(setData), and the next K.fromPromise(A.get('/api/...')).map(setData) can communicate in this version is through the state.

Maybe this?

let start$ = // a condition to start

const action$ = start$.flatMap(() => {
    const first$ = K.fromPromise(A.get('/api/...'));
    const second$ = first$.flatMap(res => K.fromPromise(A.get('/api/...', res)));
    const third$ = second$.flatMap(res => K.fromPromise(A.get('/api/...', res)));

    return K.concat([
        // These shenanigans can also be wrapped in a function
        K.constant(setLoading),
        first$.map(setData),
        K.constant(unsetLoading),
        K.constant(setLoading),
        second$.map(setData),,
        K.constant(unsetLoading),
        K.constant(setLoading),
        third$.map(setData),
        K.constant(unsetLoading),
    ])
);

The second, similar problem is error passing. In your version I can shortcut the whole sequence with error event, but I can not handle event in the next handler (by catching), unless I put this event in the state and mark it in some unique way to make it recognizable for a particular command which is totally unrealistic i.m.o.

Not sure I get this. What's the comparable imperative version? I think I can put #flatMapErrors mostly wherever I need it in order to ensure this behaves any way I need it to.

ivan-kleshnin commented 6 years ago

You fixed both with this version. But you see how it starts to fall off the sequence because there's no built-in scoping solution with promises/streams like there is with async-await. I admit this is still better than my version because everything looks more grouped – so I'll keep experimenting. Thank you!

mAAdhaTTah commented 6 years ago

If you like the imperative version better, you could also just do that:

const action$ = start$.flatMap(() => Kefir.stream(async emitter => {
    emitter.value(setLoading)
    let x = await fetch(f(q))
    emitter.value(unsetLoading)
    emitter.value(setData)

    emitter.value(setLoading)
    let y = await fetch(g(x))
    emitter.value(unsetLoading)
    emitter.value(setData)

    emitter.value(setLoading)
    let z = await fetch(g(y))
    emitter.value(unsetLoading)
    emitter.value(setData)
    emitter.end()
}));

replacing the calls to emitter.value as needed. I often use Kefir.stream to wrap imperative code like this as well.

ivan-kleshnin commented 6 years ago

Hmm, another very interesting idea. I totally forgot about this possibility. More kudos to you! πŸ‘