Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window and Buffer operators omit items which are released immediately #2091

Open
adamjones2 opened this issue Mar 10, 2024 · 2 comments
Open

Comments

@adamjones2
Copy link

I'm trying to partition a stream into windows according to a predicate on the elements. That is, implement a function like

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow);

(Incidentally, I feel like this is a common use case that should be part of the library.)
I looked in the IntroToRx docs and found the recommended approach is this:

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow)
{
    var shared = source.Publish().RefCount();
    var windowEdge = shared.Where(isStartOfNewWindow).Publish().RefCount();
    return shared.Window(windowEdge, _ => windowEdge);
}

A simple test reveals this does appear to work well:

var source = Observable.Interval(TimeSpan.FromSeconds(1));
var windowed = source.Window(x => x == 0 || x % 5 == 2).SelectMany(o => o.ToList());
windowed.Subscribe(xs => Console.WriteLine(string.Join(", ", xs)));

This prints 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, etc as expected.

However, if I now prepend some items to the source sequence, it does not work correctly:

var source = Observable.Interval(TimeSpan.FromSeconds(1)).Prepend(-1); // Prepend -1
var windowed = source.Window(x => x == -1 || x % 5 == 2).SelectMany(o => o.ToList()); // Change x == 0 condition to x == -1, as that's now the first item
windowed.Subscribe(xs => Console.WriteLine(string.Join(", ", xs)));

This prints the same as the first example - ie. ignoring the added -1, even though that should now participate in the first window and the first line should be -1, 0, 1. I observe the same behaviour with the analogous Buffer operator. I also notice defining source instead by

var source = Observable.Defer(async () => { return Observable.Return(-1L); }).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

has the same bad behaviour, but

var source = Observable.Defer(async () => { await Task.Delay(1); return Observable.Return(-1L); }).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

does not, and includes the -1 correctly. I obviously don't want to be introducing artificial delays into my streams though as a solution.

What is the correct way to implement the function I need, regardless of the timing of the events in the input sequence? If it's what I already did, can a fix be implemented in Window and Buffer for this behaviour?

@adamjones2
Copy link
Author

adamjones2 commented Mar 11, 2024

I also note

var source = Observable.Return(-1L).ObserveOn(ThreadPoolScheduler.Instance).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

works, but the windowing function can't modify the source that way without putting the entire thing on that scheduler, so this is only a hack of the input to workaround the function's deficiency and not a true solution. Perhaps useful for diagnosis though.

EDIT: I've also realised this causes the subscribe action of the original sequence (Observable.Interval(TimeSpan.FromSeconds(1))) to be offloaded onto another thread as well, which is causing issues for my use case, so I can't use it even as a workaround.

@adamjones2
Copy link
Author

In the end I had to hand-spin the implementation of the function a different way, but I'd still appreciate some insight into this and would suggest the below as a candidate for inclusion into the library.

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isWindowStart) => 
    Observable.Create<IObservable<T>>(observer =>
    {
        Subject<T>? currentWindow = null;
        return source.Subscribe(Observer.Create<T>(
            next =>
            {
                if (currentWindow == null || isWindowStart(next))
                {
                    currentWindow?.OnCompleted();
                    currentWindow = new Subject<T>();
                    observer.OnNext(currentWindow);
                }
                currentWindow.OnNext(next);
            },
            ex => { currentWindow?.OnError(ex); observer.OnError(ex); },
            () => { currentWindow?.OnCompleted(); observer.OnCompleted(); }
        ));
    });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant