diff --git a/spec/operators/distinct-spec.ts b/spec/operators/distinct-spec.ts index 8cc66537ad2..e59aba6a8ef 100644 --- a/spec/operators/distinct-spec.ts +++ b/spec/operators/distinct-spec.ts @@ -3,6 +3,7 @@ import { distinct, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { of, Observable } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; +import { asInteropObservable } from '../helpers/interop-helper'; /** @test {distinct} */ describe('distinct', () => { @@ -218,6 +219,20 @@ describe('distinct', () => { }); }); + it('should support a flushing interop stream', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' --a--b--a--b--a--b--|'); + const e1subs = ' ^-------------------!'; + const e2 = hot(' -----------x--------|'); + const e2subs = ' ^-------------------!'; + const expected = '--a--b--------a--b--|'; + + expectObservable(e1.pipe(distinct(undefined, asInteropObservable(e2)))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); + it('should raise error if flush raises error', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot(' --a--b--a--b--a--b--|'); @@ -232,6 +247,20 @@ describe('distinct', () => { }); }); + it('should raise error if interop flush raises error', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' --a--b--a--b--a--b--|'); + const e1subs = ' ^------------! '; + const e2 = hot(' -----------x-# '); + const e2subs = ' ^------------! '; + const expected = '--a--b-------# '; + + expectObservable(e1.pipe(distinct(undefined, asInteropObservable(e2)))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); + it('should unsubscribe from the flushing stream when the main stream is unsubbed', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot(' --a--b--a--b--a--b--|'); @@ -247,6 +276,21 @@ describe('distinct', () => { }); }); + it('should unsubscribe from the flushing interop stream when the main stream is unsubbed', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' --a--b--a--b--a--b--|'); + const e1subs = ' ^----------! '; + const e2 = hot(' -----------x--------|'); + const e2subs = ' ^----------! '; + const unsub = ' -----------! '; + const expected = '--a--b------ '; + + expectObservable(e1.pipe(distinct(undefined, asInteropObservable(e2))), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); + it('should allow opting in to default comparator with flush', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot(' --a--b--a--b--a--b--|'); @@ -261,6 +305,20 @@ describe('distinct', () => { }); }); + it('should allow opting in to default comparator with interop flush', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' --a--b--a--b--a--b--|'); + const e1subs = ' ^-------------------!'; + const e2 = hot(' -----------x--------|'); + const e2subs = ' ^-------------------!'; + const expected = '--a--b--------a--b--|'; + + expectObservable(e1.pipe(distinct(undefined, asInteropObservable(e2)))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + }); + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable((subscriber) => {