Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified BaseBlocking, FirstBlocking, and LastBlocking #891

Merged
merged 4 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,105 +2,52 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal abstract class BaseBlocking<T> : CountdownEvent, IObserver<T>
internal abstract class BaseBlocking<T> : ManualResetEventSlim, IObserver<T>
{
protected IDisposable _upstream;

internal T _value;
internal bool _hasValue;
internal Exception _error;
private int _once;

internal BaseBlocking() : base(1) { }

internal void SetUpstream(IDisposable d)
{
Disposable.SetSingle(ref _upstream, d);
}
internal BaseBlocking() { }

protected void Unblock()
public void OnCompleted()
{
if (Interlocked.CompareExchange(ref _once, 1, 0) == 0)
{
Signal();
}
Set();
}

public abstract void OnCompleted();
public virtual void OnError(Exception error)
public void OnError(Exception error)
{
_value = default;
_error = error;
Unblock();
Set();
}
public abstract void OnNext(T value);

public new void Dispose()
{
base.Dispose();
if (!Disposable.GetIsDisposed(ref _upstream))
{
Disposable.TryDispose(ref _upstream);
}
}
public abstract void OnNext(T value);
}

internal sealed class FirstBlocking<T> : BaseBlocking<T>
{
public override void OnCompleted()
{
Unblock();
if (!Disposable.GetIsDisposed(ref _upstream))
{
Disposable.TryDispose(ref _upstream);
}
}

public override void OnError(Exception error)
{
base.OnError(error);
if (!Disposable.GetIsDisposed(ref _upstream))
{
Disposable.TryDispose(ref _upstream);
}
}

public override void OnNext(T value)
{
if (!_hasValue)
{
_value = value;
_hasValue = true;
Disposable.TryDispose(ref _upstream);
Unblock();
Set();
}
}
}

internal sealed class LastBlocking<T> : BaseBlocking<T>
{
public override void OnCompleted()
{
Unblock();
Disposable.TryDispose(ref _upstream);
}

public override void OnError(Exception error)
{
base.OnError(error);
Disposable.TryDispose(ref _upstream);
}

public override void OnNext(T value)
{
_value = value;
_hasValue = true;
}

}
}
18 changes: 4 additions & 14 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs
Expand Up @@ -69,14 +69,9 @@ private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> sour
{
using (var consumer = new FirstBlocking<TSource>())
{
using (var d = source.Subscribe(consumer))
using (source.Subscribe(consumer))
{
consumer.SetUpstream(d);

if (consumer.CurrentCount != 0)
{
consumer.Wait();
}
consumer.Wait();
}

consumer._error.ThrowIfNotNull();
Expand Down Expand Up @@ -166,14 +161,9 @@ private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> sourc
using (var consumer = new LastBlocking<TSource>())
{

using (var d = source.Subscribe(consumer))
using (source.Subscribe(consumer))
{
consumer.SetUpstream(d);

if (consumer.CurrentCount != 0)
{
consumer.Wait();
}
consumer.Wait();
}

consumer._error.ThrowIfNotNull();
Expand Down