From 14bf2d3cd63b3f2e61b688cc3a44324f46c07505 Mon Sep 17 00:00:00 2001 From: Aaron Schiff Date: Fri, 26 Apr 2019 22:09:18 -0500 Subject: [PATCH] Simplified BaseBlocking, FirstBlocking, and LastBlocking Switched BaseBlocking to derive from ManualResetEventSlim, which is sufficient for the task at hand --- .../Linq/Observable/FirstLastBlocking.cs | 69 +++---------------- .../Linq/QueryLanguage.Blocking.cs | 18 ++--- 2 files changed, 12 insertions(+), 75 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs index 5377c214c4..8b25f536fa 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs @@ -2,105 +2,52 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. -using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { - internal abstract class BaseBlocking : CountdownEvent, IObserver + internal abstract class BaseBlocking : ManualResetEventSlim, IObserver { - protected IDisposable _upstream; - internal T _value; internal bool _hasValue; internal Exception _error; - private int _once; - - internal BaseBlocking() : base(1) { } - internal void SetUpstream(IDisposable d) - { - Disposable.SetSingle(ref _upstream, d); - } + internal BaseBlocking() { } - protected void Unblock() + public void OnCompleted() { - if (Interlocked.CompareExchange(ref _once, 1, 0) == 0) - { - Signal(); - } + Set(); } - public abstract void OnCompleted(); - public virtual void OnError(Exception error) + public void OnError(Exception error) { _value = default; _error = error; - Unblock(); + Set(); } - public abstract void OnNext(T value); - public new void Dispose() - { - base.Dispose(); - if (!Disposable.GetIsDisposed(ref _upstream)) - { - Disposable.TryDispose(ref _upstream); - } - } + public abstract void OnNext(T value); } internal sealed class FirstBlocking : BaseBlocking { - public override void OnCompleted() - { - Unblock(); - if (!Disposable.GetIsDisposed(ref _upstream)) - { - Disposable.TryDispose(ref _upstream); - } - } - - public override void OnError(Exception error) - { - base.OnError(error); - if (!Disposable.GetIsDisposed(ref _upstream)) - { - Disposable.TryDispose(ref _upstream); - } - } - public override void OnNext(T value) { if (!_hasValue) { _value = value; _hasValue = true; - Disposable.TryDispose(ref _upstream); - Unblock(); + Set(); } } } internal sealed class LastBlocking : BaseBlocking { - public override void OnCompleted() - { - Unblock(); - Disposable.TryDispose(ref _upstream); - } - - public override void OnError(Exception error) - { - base.OnError(error); - Disposable.TryDispose(ref _upstream); - } - public override void OnNext(T value) { _value = value; _hasValue = true; } - } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs index 87bcbde084..1eeb59b8dd 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs @@ -69,14 +69,9 @@ private static TSource FirstOrDefaultInternal(IObservable sour { using (var consumer = new FirstBlocking()) { - using (var d = source.Subscribe(consumer)) + using (source.Subscribe(consumer)) { - consumer.SetUpstream(d); - - if (consumer.CurrentCount != 0) - { - consumer.Wait(); - } + consumer.Wait(); } consumer._error.ThrowIfNotNull(); @@ -166,14 +161,9 @@ private static TSource LastOrDefaultInternal(IObservable sourc using (var consumer = new LastBlocking()) { - using (var d = source.Subscribe(consumer)) + using (source.Subscribe(consumer)) { - consumer.SetUpstream(d); - - if (consumer.CurrentCount != 0) - { - consumer.Wait(); - } + consumer.Wait(); } consumer._error.ThrowIfNotNull();