ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.72k stars 356 forks source link

[BUG] buffer_with_time_or_count lost some data #702

Open fanck0605 opened 9 months ago

fanck0605 commented 9 months ago

Describe the bug A clear and concise description of what the bug is.

buffer_with_time_or_count lost some data

To Reproduce Steps to reproduce the behavior:

result = []
(
    reactivex.range(100_000)
    .pipe(
        operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
    )
    .subscribe(on_next=lambda x: result.extend(x))
)
print(f"len(result) = {len(result)}")
assert result == [*range(100_000)]

Result:

len(result) = 99986
Traceback (most recent call last):
  File "D:\Projects\rxdemo\test_buffer_with_time_or_count.py", line 16, in <module>
    assert result == [*range(100_000)]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

Expected behavior A clear and concise description of what you expected to happen.

assert result == [*range(100_000)]

Code or Screenshots If applicable, add a minimal and self contained code example or screenshots to help explain your problem.

result = []
(
    reactivex.range(100_000)
    .pipe(
        operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
    )
    .subscribe(on_next=lambda x: result.extend(x))
)
print(f"len(result) = {len(result)}")
assert result == [*range(100_000)]

Additional context Add any other context about the problem here.

matiboy commented 9 months ago

Hi @fanck0605 Thank you for opening this issue; This seems quite similar to #694 and appears to be a threading issue. Using an event loop scheduler should solve the problem:

from reactivex.scheduler import EventLoopScheduler
event_loop = EventLoopScheduler()
result = []
reactivex.range(100_000).pipe(
    operators.buffer_with_time_or_count(timespan=0.001, count=10_000),
).subscribe(on_next=lambda x: result.extend(x), scheduler=event_loop)
assert result == list(range(100_000))

Hope this helps