ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.72k stars 356 forks source link

[BUG] buffer_with_time_or_count loses data #703

Open laurens-teirlynck opened 9 months ago

laurens-teirlynck commented 9 months ago

Describe the bug buffer_with_time_or_count loses data when the timer is the trigger and the on_next releases the GIL.

Related issue: https://github.com/ReactiveX/RxPY/issues/702, but this issue can get solved by using a scheduler since the on_next method does not release the GIL.

To Reproduce If have a micro-service that reads data from an external source, buffers it, and aggregates and sends it to some other external service. I have added a dummy snippet that is similar to my micro-service, but has the same issue.

The issue only occurs if the timer is triggering the on_next call.

import time
from datetime import timedelta, datetime
from random import random

import reactivex
from reactivex import operators
from reactivex.scheduler import ThreadPoolScheduler

def main():
    scheduler = ThreadPoolScheduler(max_workers=4)

    reactivex.from_iterable(iterable()).pipe(
        operators.buffer_with_time_or_count(
            # set the timespan to only 2 seconds so that the timer triggers the on_next
            timespan=timedelta(seconds=2),
            count=10000,
        ),
    ).subscribe(
        on_next=on_next,
        on_error=print,
        on_completed=print,
        scheduler=scheduler,
    )

    time.sleep(1000)

def iterable():
    i = 0
    while True:
        yield i
        time.sleep(1 / 10)  # input network delay
        i += 1

def on_next(data):
    print(datetime.utcnow(), data[0], data[-1])
    time.sleep(random() * 5)  # mock output network delay

if __name__ == '__main__':
    main()

Script output Running the script as is gives something along the lines of:

2023-10-09 08:29:16.817278 0 29
2023-10-09 08:29:24.663460 76 104
2023-10-09 08:29:29.889651 127 155
2023-10-09 08:29:36.256481 188 216
2023-10-09 08:29:43.130694 254 282
2023-10-09 08:29:47.561785 297 325
2023-10-09 08:29:55.475441 374 402

However, if I replace

operators.buffer_with_time_or_count(
            timespan=timedelta(seconds=3),  # mostly timer triggered
            count=10000,
        ),

with

operators.buffer_with_time_or_count(
            timespan=timedelta(minutes=30),
            count=100,  # count triggered
        ),

the output now looks like:

2023-10-09 08:32:29.754270 0 99
2023-10-09 08:32:41.434829 100 199
2023-10-09 08:32:56.777624 200 299
2023-10-09 08:33:10.419743 300 399
2023-10-09 08:33:24.681490 400 499
2023-10-09 08:33:35.890146 500 599

Expected behavior buffer_with_time_or_count not losing any data. It did not matter which scheduler I used (or none at all).

Additional context