Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] buffer_with_time_or_count loses data #703

Open
laurens-teirlynck opened this issue Oct 9, 2023 · 0 comments
Open

[BUG] buffer_with_time_or_count loses data #703

laurens-teirlynck opened this issue Oct 9, 2023 · 0 comments

Comments

@laurens-teirlynck
Copy link

Describe the bug
buffer_with_time_or_count loses data when the timer is the trigger and the on_next releases the GIL.

Related issue: #702, but this issue can get solved by using a scheduler since the on_next method does not release the GIL.

To Reproduce
If have a micro-service that reads data from an external source, buffers it, and aggregates and sends it to some other external service. I have added a dummy snippet that is similar to my micro-service, but has the same issue.

The issue only occurs if the timer is triggering the on_next call.

import time
from datetime import timedelta, datetime
from random import random

import reactivex
from reactivex import operators
from reactivex.scheduler import ThreadPoolScheduler


def main():
    scheduler = ThreadPoolScheduler(max_workers=4)

    reactivex.from_iterable(iterable()).pipe(
        operators.buffer_with_time_or_count(
            # set the timespan to only 2 seconds so that the timer triggers the on_next
            timespan=timedelta(seconds=2),
            count=10000,
        ),
    ).subscribe(
        on_next=on_next,
        on_error=print,
        on_completed=print,
        scheduler=scheduler,
    )

    time.sleep(1000)


def iterable():
    i = 0
    while True:
        yield i
        time.sleep(1 / 10)  # input network delay
        i += 1


def on_next(data):
    print(datetime.utcnow(), data[0], data[-1])
    time.sleep(random() * 5)  # mock output network delay


if __name__ == '__main__':
    main()

Script output
Running the script as is gives something along the lines of:

2023-10-09 08:29:16.817278 0 29
2023-10-09 08:29:24.663460 76 104
2023-10-09 08:29:29.889651 127 155
2023-10-09 08:29:36.256481 188 216
2023-10-09 08:29:43.130694 254 282
2023-10-09 08:29:47.561785 297 325
2023-10-09 08:29:55.475441 374 402

However, if I replace

operators.buffer_with_time_or_count(
            timespan=timedelta(seconds=3),  # mostly timer triggered
            count=10000,
        ),

with

operators.buffer_with_time_or_count(
            timespan=timedelta(minutes=30),
            count=100,  # count triggered
        ),

the output now looks like:

2023-10-09 08:32:29.754270 0 99
2023-10-09 08:32:41.434829 100 199
2023-10-09 08:32:56.777624 200 299
2023-10-09 08:33:10.419743 300 399
2023-10-09 08:33:24.681490 400 499
2023-10-09 08:33:35.890146 500 599

Expected behavior
buffer_with_time_or_count not losing any data. It did not matter which scheduler I used (or none at all).

Additional context

  • OS: Linux/MacOS
  • RxPY: 4.0.0
  • Python: 3.10.0/3.11.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant