ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.72k stars 356 forks source link

Debounce operator does not ignore values, it queues them with delay #658

Closed teodorkostov-es closed 1 year ago

teodorkostov-es commented 1 year ago

Describe the bug Debounce operator does not ignore values, it queues them with delay.

To Reproduce

test = rx.interval(0.1).pipe(
    ops.debounce(1.5, scheduler=CurrentThreadScheduler()),
  ).subscribe(print)

Truth be told, I am using a socket as the source. It's providing the data to a Subject in a thread different from the main one. All rx execution happens in that thread.

Expected behavior I would expect values from 2 to around 15 to be dropped.

Additional context Add any other context about the problem here.

dbrattli commented 1 year ago

You cannot use the CurrentThreadScheduler here. It cannot schedule blocking work on a single thread (current) without blocking everything else while waiting, so things will not behave as you expect. You need to use a thread-based scheduler e.g TimeoutScheduler, but perhaps ThreadPoolScheduler is a better choice.

teodorkostov-es commented 1 year ago

@dbrattli, thank you for the help. Unfortunately, these two schedulers do not get the job done. It seems that I have to change something in my threaded app.

When I use these schedulers, the entire pipeline freezes. Could the issue be that I block the main thread with some work while in parallel in another thread, the Rx stream runs with the debounce operator?

dbrattli commented 1 year ago

It it's not easy to say. Multi-threaded applications can be complex to get right. You will have to experiment and get into the details of this. But you will have to use another scheduler for debounce than the CurrentThreadScheduler.