Skip to content

Commit

Permalink
feat(rxjs): expand no longer supports a scheduler parameter (#7431)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: `expand` no longer supports a scheduler parameter. Use `scheduled` or `subscribeOn` or `observeOn` instead.
  • Loading branch information
benlesh committed Jan 22, 2024
1 parent 7f36e90 commit 4aec7bd
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 82 deletions.
41 changes: 1 addition & 40 deletions packages/rxjs/spec/operators/expand-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,6 @@ describe('expand', () => {
});
});

it('should work with scheduler', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --x----| ', { x: 1 });
const e1subs = ' ^------! ';
const e2 = cold(' --c| ', { c: 2 });
// --c|
// --c|
const expected = '--a-b-c-d|';
const values = { a: 1, b: 2, c: 4, d: 8 };

const result = e1.pipe(expand((x) => (x === 8 ? EMPTY : e2.pipe(map((c) => c * x))), Infinity, testScheduler));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should map and recursively flatten', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const values = {
Expand Down Expand Up @@ -470,35 +453,13 @@ describe('expand', () => {
return cold(e2shape, { z: x + x });
};

const result = e1.pipe(expand(project, undefined, undefined));
const result = e1.pipe(expand(project, undefined));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should work with the AsapScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asapScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should work with the AsyncScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asyncScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
Expand Down
21 changes: 4 additions & 17 deletions packages/rxjs/src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import type { OperatorFunction, ObservableInput, ObservedValueOf, SchedulerLike } from '../types.js';
import type { OperatorFunction, ObservableInput, ObservedValueOf } from '../types.js';
import { Observable } from '../Observable.js';
import { mergeInternals } from './mergeInternals.js';

export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent?: number,
scheduler?: SchedulerLike
): OperatorFunction<T, ObservedValueOf<O>>;
/**
* @deprecated The `scheduler` parameter will be removed in v8. If you need to schedule the inner subscription,
* use `subscribeOn` within the projection function: `expand((value) => fn(value).pipe(subscribeOn(scheduler)))`.
* Details: Details: https://rxjs.dev/deprecations/scheduler-argument
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent: number | undefined,
scheduler: SchedulerLike
concurrent?: number
): OperatorFunction<T, ObservedValueOf<O>>;

/**
Expand Down Expand Up @@ -70,8 +59,7 @@ export function expand<T, O extends ObservableInput<unknown>>(
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent = Infinity,
scheduler?: SchedulerLike
concurrent = Infinity
): OperatorFunction<T, ObservedValueOf<O>> {
concurrent = (concurrent || 0) < 1 ? Infinity : concurrent;
return (source) =>
Expand All @@ -89,8 +77,7 @@ export function expand<T, O extends ObservableInput<unknown>>(
undefined,

// Expand-specific
true, // Use expand path
scheduler // Inner subscription scheduler
true // Use expand path
)
);
}
25 changes: 4 additions & 21 deletions packages/rxjs/src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Observable, Subscriber} from '../Observable.js';
import type { Observable, Subscriber } from '../Observable.js';
import { from, operate } from '../Observable.js';
import type { ObservableInput, SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';
import type { ObservableInput } from '../types.js';

/**
* A process embodying the general "merge" strategy. This is used in
Expand All @@ -22,9 +21,7 @@ export function mergeInternals<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike,
additionalFinalizer?: () => void
expand?: boolean
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
Expand Down Expand Up @@ -107,15 +104,7 @@ export function mergeInternals<T, R>(
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(destination, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
Expand All @@ -140,10 +129,4 @@ export function mergeInternals<T, R>(
},
})
);

// Additional finalization (for when the destination is torn down).
// Other finalization is added implicitly via subscription above.
return () => {
additionalFinalizer?.();
};
}
10 changes: 6 additions & 4 deletions packages/rxjs/src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,19 @@ export function mergeScan<T, R>(
// The accumulated state.
let state = seed;

return mergeInternals(
mergeInternals(
source,
subscriber,
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
state = value;
},
false,
undefined,
() => (state = null!)
false
);

return () => {
state = null!;
};
});
}

0 comments on commit 4aec7bd

Please sign in to comment.