Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite loop from Buffer and ReplaySubject interaction #1985

Open
Norskan opened this issue Aug 29, 2023 · 0 comments
Open

Infinite loop from Buffer and ReplaySubject interaction #1985

Norskan opened this issue Aug 29, 2023 · 0 comments

Comments

@Norskan
Copy link

Norskan commented Aug 29, 2023

Which library version?
Bug occurs with 5.0 and 6.0

What is the use case or problem?
We have a continuous stream of data that we want to process in chunks. To archive this we a buffer that we can flush if needed.

public static IObservable<IList<TSource>> FlushableBuffer<TSource>(this IObservable<TSource> source, int max, ISubject<Unit> manualFlushSignal)
    {
        var bufferOverFlowFlushSignal = source.Skip(max - 1)
                                              .Select(_ => Unit.Default);
        var mergedSignals = manualFlushSignal.Merge(bufferOverFlowFlushSignal);

        return source.Buffer(() => mergedSignals);
    }

This behavior works fine for BehaviorSubject and Subject but with ReplaySubject we get an infinite loop when the buffer is full.

I wrote some tests to illustrate this issue:
ReactiveReplaySubjectFlushableBuffer.zip

Best Regards
Simon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant