ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.81k stars 3.01k forks source link

Examples from contributor days #2324

Open jkup opened 7 years ago

jkup commented 7 years ago

At RxJS contributor days we discussed adding an examples/ folder to the repo and populating it with some real use cases for RxJS. Here are the examples we came up with:

benlesh commented 7 years ago

This is fantastic!

rgbkrk commented 7 years ago

Thanks for adding this as a checklist, I've been hoping to come back to a few of these with some narrative style docs. 😄

shrynx commented 7 years ago

This looks like a nice place to start contributing. Is there any update or plans to incorporate it ?

When i started with Rx js most available examples were for Rx 4 (still are ?) and i kept looking up the migration guide. This would make things a lot simpler for people to pick up rxjs

rgbkrk commented 7 years ago

Feel free to pick a topic and start focusing on the content. Could be a blog post or some new markdown to incorporate here in some narrative style docs.

shrynx commented 7 years ago

I did start writing a blog , but have been really busy to continue it, should get back to it soon. Thanks for reminding :) But i was referring to having an examples folder like the one in Rxjs 4 repo

benlesh commented 6 years ago

@ladyleet I don't want to lose track of this for the docs team... can you please bring this to their attention? I'll ping them too

kievsash commented 6 years ago

Any progress on documenting these topics? Schedulers (dealing with large, stack destroying recursive processes) - Very interesting to get deeper. Re-entrant examples (see RX issues for specific problems) - can you provide issue links or numbers plz?

niklas-wortmann commented 6 years ago

So we want to start with some kind of recipes, but there is no progress at the moment as far as I know. But I would really appreciate any support on this topic :)

kievsash commented 6 years ago

Well, I plan to write an article with scheduler-recursion-reentrant SO examples but I think in a month. I will send link here then. At the moment found some interesting issues (mostly closed) as examples source:

  1. https://github.com/ReactiveX/rxjs/issues/2341
  2. https://github.com/ReactiveX/rxjs/issues/3270
  3. https://github.com/ReactiveX/rxjs/issues/2414
  4. https://github.com/ReactiveX/rxjs/pull/3621#issuecomment-386356358 (cannot reproduce on rx.js 6.2) and same issue https://github.com/ReactiveX/rxjs/issues/3609
  5. https://github.com/ReactiveX/rxjs/issues/4172#issuecomment-423841815
  6. And from https://github.com/ReactiveX/rxjs/issues/2935 nice comment:

For example range(0, 100000, Scheduler.asap) will efficiently emit 100k numbers over 100k macrotasks without buffering. range(0, 100000).observeOn(Scheduler.asap) will immediately buffer 100k notifications as AsapActions into the AsapScheduler queue, which will all be flushed synchronously on the next macrotask. Not the same thing at all.

I create playground for this https://codepen.io/kievsash/pen/QZONpW

If someone will add more interesting real-world issues with improper usage of Schedulers because of recursive calls - it would be great

kievsash commented 5 years ago

Just created a doc (article) about queue vs null scheduler difference. https://medium.com/@alexanderposhtaruk/so-how-does-rx-js-queuescheduler-actually-work-188c1b46526e. If you find it valuable - feel free to use it for documentation.

olegomon commented 5 years ago

@benlesh The previous discussion on back pressure story has been locked. #71 so I try to start the discussion on one of the examples above here.

I've created my own custom back pressure operator which is based on existing RxJS operators. And I would like to have an opinion from the RxJS community on my implementation and compare to other approaches out there.

backpressure.ts

import {generate, merge, Observable, OperatorFunction, Subject, zip} from "rxjs";
import {finalize, concatMap} from "rxjs/operators";

/**
 * @param consumer Observable of values to process by the consumer
 * @param bufferSize the size of concurrent observables
 */
export const backpressure = <T, R>(consumer: (value: T, index: number) => Observable<R>, bufferSize: number = 10): OperatorFunction<T, R> => {

    return (source$: Observable<T>): Observable<R> => {

        // subject for notifying / pulling for the next value from the source observable
        const trigger$ = new Subject();

        // first initial notifications
        // this will create n "working" observables
        const buffer$ = generate(0, (n) => n < bufferSize, (i) => i + 1);

        // notifiers stream to pull next value
        const notifiers$ = merge(trigger$, buffer$);

        // each time some values are ready for process or the notifiers observable emits,
        // the next value from the source is taken an pumped through the consumer observable
        return zip(source$, notifiers$).pipe(
            concatMap(([value], index) => consumer(value, index).pipe(
                // trigger next event to process next value as soon one of the consumer observables has completed
                finalize(() => trigger$.next())
            ))
        );
    };
};

example.ts

import {from, of, range} from "rxjs";
import {bufferCount, delay, map, mergeMap, tap, toArray} from "rxjs/operators";
import {backpressure} from "./backpressure";

function random(max = 1000) {
    return Math.floor(Math.random() * Math.floor(max));
}

const total = 30;
const bufferSize = 5;
const batchSize = 10;
const timeout = 5000;

const numbers$ = range(1, total).pipe(
    backpressure((value, index) => of(value).pipe(
        // simulate work
        delay(random(timeout))
    ), bufferSize)
);

numbers$.subscribe((result) => console.log(result));

// apply backpressure to each buffer group independently
const batches$ = range(1, total).pipe(

    // split values by buffer size
    bufferCount(batchSize),

    tap((batch) => console.log("process batch", batch)),

    mergeMap((batch) => from(batch).pipe(
        // apply backpressure to each batch
        backpressure((value, index) => of(value).pipe(
            // simulate work
            delay(random(timeout))
        ), bufferSize),
        toArray()
    )),
    map(result => result.sort((a, b) => a - b))
);

batches$.subscribe((batch) => console.log("finish batch", batch));
kievsash commented 5 years ago

In the example above if source produces many values - 'zip' should buffer it. In the case of intensive source emissions - it can exhaust memory resources. But of course, it cannot be solved if we do not suppress source emission ability.

olegomon commented 5 years ago

@kievsash I thought about that aspect. The user of the operator is still able to throttle the source stream or apply any buffering strategy on the input source.

kievsash commented 5 years ago

@olegomon Interesting topic to discuss btw. 2 possibilities:

  1. To prevent over-buffering - throttle source (so source continue to produce but you ignore (lose) some of the produced values. (suspending if source acts as hot observable is not possible) your example can work with this case.
  2. suspend source for cold observables as a source- in that case, source acts as a generator function. You do not lose any value. But source should somehow provide suspend interface.
benlesh commented 4 years ago

Bump. This issue makes me sad every time I look at it. There's so much to do, and not enough help. 😿

six-edge commented 3 years ago

I would like to contribute to the Request / Reply pattern on websockets topic.

I made an example of an async request response method here: https://github.com/ReactiveX/rxjs/discussions/5955#discussioncomment-450429

Should I:

  1. Extend the WebSocketSubject with the new async request method?
  2. Open a separate issue to track this?
  3. Open up a PR for this task/issue?