kosich / rxjs-autorun

Re-evaluate an expression whenever Observable in it emits
MIT License
33 stars 2 forks source link

Allowing dependent observables? #12

Closed Jopie64 closed 3 years ago

Jopie64 commented 3 years ago

Consider this scenario:

const myPresence$ = run(() => {
   const userId = $(loggedInUser$);
   return $(getPresence(userId));
});

At first sight, to me it looked like this is going to work well. But on second thought... What if the presence receives it's first value? The expression will run again. Hence it will call getPrecense(userId) again which might (probably) return a new observable. That new observable will be observed immediately. So when it emits synchronously it will end up in an infinite loop or yield a stack overflow!

So currently this lib doesn't support dependent observables like this. Question is, do we want to allow this? And if so, how would we do this? My first thought was, detecting whether the argument of getPresence is still the same, and if it is, don't run the function but use the old value. But I currently can't think of a way to check that, and also none to block running that function. Maybe require notation $(() => getPresence(userId)) or something? And include dependent arguments like $(userId, () => getPresence(userId))? Looks a bit ugly maybe... Any other ideas? Or just drop this issue?

kosich commented 3 years ago

Yep, issuing observables inside run is not straightforward now. I'm also worried about $(a.pipe(ā€¦)) scenarios ā€” people would try to do that often, imho.

As to getPresence(userId) ā€” it seems like a good-ol' memoization issue (or "cache"? "dependent re-run"? not sure about the smart name)

We just need to create that memoized fn outside run, a very naive implementation:

const cache = fn => {
  let executed = false;
  let cachedO = null;
  let cachedResult = null;
  return o => {
    if (executed && Object.is(cachedO, o)) return cachedResult;
    cachedResult = fn(o);
    cachedO = o;
    executed = true;
    return cachedResult;
  }
}

let cachedGetPresence = cache(getPresence);

const myPresence$ = run(() => {
   const userId = $(loggedInUser$);
   return $(cachedGetPresence(userId));
});

There are many such 3rd-party implementations, so I think we can safely release w/o such a feature now, knowing that it's work-around-able.

Maybe, in the future we'll add some additional API to make it easier, something similar to what you're saying: run a fn, dependent on these observables.

Now that I wrote this sentence... šŸ¤” It sounds crazy, but run does exactly that, right? šŸ˜…

Would something like this work?

const getPresence$ = run(() => {
  let userId = $(loggedInUser$);
  return getPresence(userId);
});

const getMyPresence$ = run(() => $($(getPresence$)));

both snippets were written in github comment, so they might be full of errors

Jopie64 commented 3 years ago

Memoization could be a solution indeed! Didn't think about that. Not really obvious that you should use that though, I think it should be clear from the docs.

I'm not sure how that example should work.... Whenever getPresence$ is subscribed indirectly, I can't see why that would cause a difference in behavior...

kosich commented 3 years ago

Here's an actual test that I tried today:

it('the trick', () => {
    const fn = x => of(x + 1);
    const source$ = of(1);

    const fn$ = run(() => fn($(source$)));
    const result$ = run(() => $($(fn$)));

    sub = result$.subscribe(observer);
    expect(observer.next.mock.calls).toEqual([[2]]);
});

The idea is that fn returns an Observable. So fn$ is an Observable of fn-results: Observable<Observable<number>>and it will emit only when source$ changes.

And if the second run re-evaluates ā€” it will always get the latest value from fn$ stream ā€” which, unlike re-running fn each time, is stable.

It seems to be equal to a switchMap.

Does it make sense?

When I realized yesterday that we're inventing what we already invented ā€” I giggled for 10 minutes straight šŸ™‚

kosich commented 3 years ago

While I agree that the behavior with functions recalls and .pipe repipes should be well documented. And the two solutions (if the second works, I'm still not 100% sure šŸ™‚) ā€” should be explained.

Would that be enough for now? If so, I'll mark it as documentation-related. And we'll create a new issue if/when we'll need a dedicated API. Is that ok?

LMKWYT

Jopie64 commented 3 years ago

Yes I think that's ok for now :)

kosich commented 3 years ago

BTW, I've got another interesting package that might play well with this one.

Start of shameless advertising

import { of } from 'rxjs';
import { proxify } from 'rxjs-proxify';

const source$ = of({ a: 1 }, { a: 2 }, { a: 3 });
const result$ = proxify(source$).a;
result$.a ā‰ˆ source$.pipe( pluck('a') );
// although:
result$.a === result$.a

package: rxjs-proxify

--

And based on this I've created a concept of an observable state:

// Simple Observable State

import { createState } from "./createState";

// create a state
const state = createState({ count: 0, timer: 100 });

// listen to state changes
state.count
  .subscribe(c => console.log('C:', c)); // > C:0

// write to the state
state.count.next(1); // > C:1

// reset the state
state.next({ timer: 2, count: 2 }) // > C:2

state.count += 1; // > C:3

state.count.next(3); // ignored: same value

// read current values:
console.log(state.timer + state.count); // > 2 + 3 = 5

yet unpublished concept: https://stackblitz.com/edit/rstate-united?file=index.ts

And this might be well combined with autorun:

const state = createState({ a: 0, b: 0 });
const c = compute(() => $(state.a) + $(state.b));
c.subscribe(console.log); // > log 0
state.a.next(1); // > log 1
state.b.next(1); // > log 2

End of shameless advertising

FYI šŸ™‚

Jopie64 commented 3 years ago

Nice! I'll take a look at it soon. On first sight, it looks like the opposite of something I once made for our project :D (Copy/paste)

// Helper types needed for combineObs
type ObservableRecord = Record<keyof any, Observable<any>>;
type ExtractFromObservableRecord<T extends ObservableRecord, K extends keyof T> = T[K] extends Observable<infer U> ? U : never;
type PlainFromObservables<T extends ObservableRecord> = { [key in keyof T]: ExtractFromObservableRecord<T, key> };

// combineObs wraps combineLatest but one can name each observable. The same names are
// reflected in the output value. See the spec for usage examples.
export const combineObs = <T extends ObservableRecord>(obs: T): Observable<PlainFromObservables<T>> => {
  // casting to any because typescript doesn't agree about the return types
  if (isEmpty(obs)) {
    return of({} as any);
  }

  type MakeObjFromValues = (values: any[]) => PlainFromObservables<T>;

  const keys = Object.keys(obs);
  const makeObjFromValues = zipObj(keys) as MakeObjFromValues;

  return combineLatest(keys.map(k => obs[k])).pipe(
      map(makeObjFromValues));
};

Which can be used like


const out$ = combineObs({
  a: new Subject<number>(),
  b: new Subject<string>()
});

// TypeScript will deduce type of out$ to be like Observable<Out> where out is:
type Out = { a: number, b: string }