ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.77k stars 361 forks source link

Add combine_throttle operator #711

Open kormang opened 8 months ago

kormang commented 8 months ago

Merges the specified observable sequences into one observable sequence by creating a result whenever all of the observable sequences have produced an element at a corresponding index. Faster observables, that emits events more frequently, are throttled, so that they match speed of slower observables. Speed of emitting items matches speed of slowest source observable. It is somewhat similar to :func:reactivex.zip operator, returns tuple, but items of faster observables are dropped, so that only latest values are emitted at each index. It is also similar to :func:reactivex.combine_latest, but emits new item only when all sources produce new item. Only latest items are included in resulting tuple, others are dropped, similar to :func:reactivex.with_latest_from.

.. marble::
    :alt: combine_throttle

    --1---2-3--------4---|
    -a-------b--c-d------|
    [ combine_throttle() ]
    --1,a----3,b-----4,d-|

Combined with zip, combine_latest, and with_latest_from, this seems to be the missing peace to have all the tools for reactive state management. This allows as to be notified when state that consists of multiple parts, is completely changed, rather then when only some parts of it changed. This is the way zip works, but zip doesn't provide us with the latest state if observables emit at different speed.

coveralls commented 8 months ago

Coverage Status

coverage: 93.49% (+0.06%) from 93.433% when pulling dd5cf2176e10cd5b95eb82622f4d6f7a89ce379e on kormang:combine_throttle into af1663d35810fdcd4c25a3ed2e8f0d71b55c341d on ReactiveX:master.

kormang commented 8 months ago

Hello!

I've used this operator in my code for some time, with outdated version of RxPY. Now, I decided to cut it out of my code and put in RxPY library, and update it.

I've covered it with tests, and run the pre-commit checks (although I had to update isort locally, see).

I'm sure you'd request some changes, but first questions is:

Is it even OK to add new operators?

Then another question is:

Is this operator valid and useful? (It was for me).

If it is OK, I'll make PR to RX C++ as well. I could also contribute to RxJS, as I feel fluent enough in Python, C++ and JavaScript to make the same contribution to corresponding libraries. I might as well contribute to RxJava, but would require to much for to setup the environment, I haven't done it in a while, but I could do that too.

P.S. I had one contribution to RxPY three years ago :)

kormang commented 8 months ago

I seems that code quality check are failing for this reason: https://stackoverflow.com/questions/75269700/pre-commit-fails-to-install-isort-5-11-4-with-error-runtimeerror-the-poetry-co

All that is required is to update isort to 5.12.0, or newer. I could do that as well.