gsvarovsky / rx-flowable

RxJS observables with lossless back-pressure
MIT License
5 stars 0 forks source link

Discussion: allow flowable to process multiple values simultaneously #3

Open prevostc opened 1 year ago

prevostc commented 1 year ago

Hi there,

I just stumbled upon your trail of thoughts regarding rxjs and backpressure:

I built some kind of ETL using RxJS and I wanted to process "as much values as possible at the same time".

At the moment I have a pipeline (Observable) taking work units as input, doing lots of complex stuff and yielding success/failure status as output. Obviously, I find myself in a position where I can't ask the pipeline to process all work units at once, so I need to do some kind of backpressure handling.

My idea would be to use a threshold on process.memoryUsage() and process.cpuUsage() to know if the server can handle more work, and in this case, send more work units in the observable without waiting for the current work units to be done.

I'm not sure that would fit the scope of this library, but if it does, I would be happy to contribute.

Thank you for you work <3

gsvarovsky commented 1 year ago

Hi Clément, thanks for your interest and your use-case!

If you have a Consumable of "values" and you want to process them in parallel, you can call next before you've finished with the current one, e.g.:

values.subscribe(async ({ value, next }) => {
  await server.hasCapacity;
  server.process(value).catch(console.error);
  next(); // Will be called immediately without waiting for processing
});

Try it here

(You would need to consider what to really do with errors.)

I think in most real cases the "server" here would not be "this server", i.e. this very node process, because heavy lifting is likely to be somewhere else, like an analytical engine. So, having some utility in this library to throttle based on local memory or CPU is probably not the right abstraction, unless it's just an example.

Possibly we could have a throttle operator that accepts the hasCapacity and process functions as arguments. There's probably a more "Rx" way to do that though. 🤔

prevostc commented 1 year ago

Hi sir,

Thanks lot for you answer and demo ! (amazing btw) I'm struggling to wrap my head around it rn but I'll eventually manage :D

having some utility in this library to throttle based on local memory or CPU is probably not the right abstraction, unless it's just an example.

Totally agree.

Possibly we could have a throttle operator that accepts the hasCapacity and process functions as arguments.

That would be an great addition, Rx is definitely missing some kind of looselessThrottleWhen operator. (I think that would be the right "Rx" name for this) But maybe that should be a package on it's own, idk.

gsvarovsky commented 1 year ago

Thinking about it, the operator could take just the hasCapacity signal, and effectively collapse the Consumable back into a plain Observable, which is subscribed by the server.

Taking inspiration from RxJS operators like delayWhen, it would make sense to have the hasCapacity signal be an Observable rather than a Promise.

So, I think our throttleWhen operator could have the signature:

throttleWhen<T>(capacitySignal: (value: T, index: number) => Observable<any>): OperatorFunction<Bite<T>, T>

I'm a bit backlogged at the moment, but if you'd like to have a go I'd be happy to review!

prevostc commented 1 year ago

That would be a nice design!

For reference, here is what I ended up doing in the meantime. This is very ugly but hey, it works for now :D https://codesandbox.io/s/distracted-merkle-bx8k5z

I will definitely give this proper throttleWhen a try soon!

prevostc commented 1 year ago

All riiiiight.

So, long story short, the previous version did silent all exceptions so I rewrote it using delayWhen and an ever growing array of observables: https://codesandbox.io/s/distracted-merkle-bx8k5z?file=/src/throttle-when.ts

Far from perfect but that's a start.