Skip to content

Commit

Permalink
fix: scheduling with Rx-provided schedulers will no longer leak actio…
Browse files Browse the repository at this point in the history
…n references

Resolves ReactiveX#6561
  • Loading branch information
benlesh committed Aug 13, 2021
1 parent 5d0e41e commit 6526f14
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 159 deletions.
3 changes: 2 additions & 1 deletion src/internal/observable/combineLatest.ts
Expand Up @@ -10,6 +10,7 @@ import { popResultSelector, popScheduler } from '../util/args';
import { createObject } from '../util/createObject';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { AnyCatcher } from '../AnyCatcher';
import { executeSchedule } from '../util/executeSchedule';

// combineLatest(any)
// We put this first because we need to catch cases where the user has supplied
Expand Down Expand Up @@ -293,7 +294,7 @@ export function combineLatestInit(
*/
function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
if (scheduler) {
subscription.add(scheduler.schedule(execute));
executeSchedule(subscription, scheduler, execute);
} else {
execute();
}
Expand Down
22 changes: 10 additions & 12 deletions src/internal/operators/bufferTime.ts
Expand Up @@ -5,6 +5,7 @@ import { OperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';
import { asyncScheduler } from '../scheduler/async';
import { popScheduler } from '../util/args';
import { executeSchedule } from '../util/executeSchedule';

/* tslint:disable:max-line-length */
export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
Expand Down Expand Up @@ -117,21 +118,18 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
subs,
};
bufferRecords.push(record);
subs.add(scheduler.schedule(() => emit(record), bufferTimeSpan));
executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan);
}
};

bufferCreationInterval !== null && bufferCreationInterval >= 0
? // The user passed both a bufferTimeSpan (required), and a creation interval
// That means we need to start new buffers on the interval, and those buffers need
// to wait the required time span before emitting.
subscriber.add(
scheduler.schedule(function () {
startBuffer();
!this.closed && subscriber.add(this.schedule(null, bufferCreationInterval));
}, bufferCreationInterval)
)
: (restartOnEmit = true);
if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
// The user passed both a bufferTimeSpan (required), and a creation interval
// That means we need to start new buffers on the interval, and those buffers need
// to wait the required time span before emitting.
executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true);
} else {
restartOnEmit = true;
}

startBuffer();

Expand Down
7 changes: 6 additions & 1 deletion src/internal/operators/mergeInternals.ts
Expand Up @@ -2,6 +2,7 @@ import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { Subscriber } from '../Subscriber';
import { ObservableInput, SchedulerLike } from '../types';
import { executeSchedule } from '../util/executeSchedule';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
Expand Down Expand Up @@ -114,7 +115,11 @@ export function mergeInternals<T, R>(
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue);
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
}
// Check to see if we can complete, and complete if so.
checkComplete();
Expand Down
46 changes: 4 additions & 42 deletions src/internal/operators/observeOn.ts
@@ -1,5 +1,6 @@
/** @prettier */
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { executeSchedule } from '../util/executeSchedule';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

Expand Down Expand Up @@ -57,51 +58,12 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function observeOn<T>(scheduler: SchedulerLike, delay = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
/**
* Executes work with the provided scheduler and provided delay.
* This exists primarily to manage the Actions being scheduled to make
* sure they are removed from the parent Subscription.
* Actions will be retained in memory until the parent Subscription finalizes
* because they could be rescheduled at any time. We know the
* actions in this operator are NOT going to be rescheduled, so
* we want to make sure they're removed as soon as possible.
* @param execute The work to schedule with the scheduler provided
*/
const schedule = (execute: () => void) => {
let syncUnsub = false;
const actionSubs = scheduler.schedule(() => {
if (actionSubs) {
// The action fired asynchronously, so we have a subscription
// we can unsubscribe before continuing. Unsubscription will
// remove the Action/Subscription from the parent (subscriber).
actionSubs.unsubscribe();
} else {
// The action fired synchronously, so we don't have a
// subscription we can unsubscribe just yet. Flag that
// we want to unsubscribe when we do get it.
syncUnsub = true;
}
// Execute the work required.
execute();
}, delay);

if (syncUnsub) {
// The action above fired synchronously, so we can tear it down.
actionSubs.unsubscribe();
} else {
// The action hasn't fired yet. It's asynchronous. So we should
// add it to our subscriber, which is the parent Subscription,
// so it is unsubscribed if our consumer unsubscribes.
subscriber.add(actionSubs);
}
};

source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => schedule(() => subscriber.next(value)),
() => schedule(() => subscriber.complete()),
(err) => schedule(() => subscriber.error(err))
(value) => executeSchedule(subscriber, scheduler, () => subscriber.next(value), delay),
() => executeSchedule(subscriber, scheduler, () => subscriber.complete(), delay),
(err) => executeSchedule(subscriber, scheduler, () => subscriber.error(err), delay)
)
);
});
Expand Down
24 changes: 14 additions & 10 deletions src/internal/operators/timeout.ts
Expand Up @@ -6,8 +6,8 @@ import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { createErrorClass } from '../util/createErrorClass';
import { caughtSchedule } from '../util/caughtSchedule';
import { OperatorSubscriber } from './OperatorSubscriber';
import { executeSchedule } from '../util/executeSchedule';

export interface TimeoutConfig<T, O extends ObservableInput<unknown> = ObservableInput<T>, M = unknown> {
/**
Expand Down Expand Up @@ -342,18 +342,22 @@ export function timeout<T, O extends ObservableInput<any>, M>(
// tell how many values we have seen so far.
let seen = 0;
const startTimer = (delay: number) => {
timerSubscription = caughtSchedule(
timerSubscription = executeSchedule(
subscriber,
scheduler,
() => {
originalSourceSubscription.unsubscribe();
innerFrom(
_with!({
meta,
lastValue,
seen,
})
).subscribe(subscriber);
try {
originalSourceSubscription.unsubscribe();
innerFrom(
_with!({
meta,
lastValue,
seen,
})
).subscribe(subscriber);
} catch (err) {
subscriber.error(err);
}
},
delay
);
Expand Down
22 changes: 10 additions & 12 deletions src/internal/operators/windowTime.ts
Expand Up @@ -7,6 +7,7 @@ import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';
import { popScheduler } from '../util/args';
import { executeSchedule } from '../util/executeSchedule';

export function windowTime<T>(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, Observable<T>>;
export function windowTime<T>(
Expand Down Expand Up @@ -136,21 +137,18 @@ export function windowTime<T>(windowTimeSpan: number, ...otherArgs: any[]): Oper
};
windowRecords.push(record);
subscriber.next(window.asObservable());
subs.add(scheduler.schedule(() => closeWindow(record), windowTimeSpan));
executeSchedule(subs, scheduler, () => closeWindow(record), windowTimeSpan);
}
};

windowCreationInterval !== null && windowCreationInterval >= 0
? // The user passed both a windowTimeSpan (required), and a creation interval
// That means we need to start new window on the interval, and those windows need
// to wait the required time span before completing.
subscriber.add(
scheduler.schedule(function () {
startWindow();
!this.closed && subscriber.add(this.schedule(null, windowCreationInterval));
}, windowCreationInterval)
)
: (restartOnClose = true);
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
// The user passed both a windowTimeSpan (required), and a creation interval
// That means we need to start new window on the interval, and those windows need
// to wait the required time span before completing.
executeSchedule(subscriber, scheduler, startWindow, windowCreationInterval, true);
} else {
restartOnClose = true;
}

startWindow();

Expand Down
28 changes: 16 additions & 12 deletions src/internal/scheduled/scheduleAsyncIterable.ts
@@ -1,28 +1,32 @@
import { SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { executeSchedule } from '../util/executeSchedule';

export function scheduleAsyncIterable<T>(input: AsyncIterable<T>, scheduler: SchedulerLike) {
if (!input) {
throw new Error('Iterable cannot be null');
}
return new Observable<T>(subscriber => {
const sub = new Subscription();
sub.add(
scheduler.schedule(() => {
const iterator = input[Symbol.asyncIterator]();
sub.add(scheduler.schedule(function () {
iterator.next().then(result => {
return new Observable<T>((subscriber) => {
executeSchedule(subscriber, scheduler, () => {
const iterator = input[Symbol.asyncIterator]();
executeSchedule(
subscriber,
scheduler,
() => {
iterator.next().then((result) => {
if (result.done) {
// This will remove the subscriptions from
// the parent subscription.
subscriber.complete();
} else {
subscriber.next(result.value);
this.schedule();
}
});
}));
})
);
return sub;
},
0,
true
);
});
});
}
39 changes: 24 additions & 15 deletions src/internal/scheduled/scheduleIterable.ts
Expand Up @@ -2,7 +2,7 @@ import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { iterator as Symbol_iterator } from '../symbol/iterator';
import { isFunction } from '../util/isFunction';
import { caughtSchedule } from '../util/caughtSchedule';
import { executeSchedule } from '../util/executeSchedule';

/**
* Used in {@link scheduled} to create an observable from an Iterable.
Expand All @@ -16,15 +16,25 @@ export function scheduleIterable<T>(input: Iterable<T>, scheduler: SchedulerLike
// Schedule the initial creation of the iterator from
// the iterable. This is so the code in the iterable is
// not called until the scheduled job fires.
subscriber.add(
scheduler.schedule(() => {
// Create the iterator.
iterator = (input as any)[Symbol_iterator]();
executeSchedule(subscriber, scheduler, () => {
// Create the iterator.
iterator = (input as any)[Symbol_iterator]();

executeSchedule(
subscriber,
scheduler,
() => {
let value: T;
let done: boolean | undefined;
try {
// Pull the value out of the iterator
({ value, done } = iterator.next());
} catch (err) {
// We got an error while pulling from the iterator
subscriber.error(err);
return;
}

// Schedule the first iteration and emission.
caughtSchedule(subscriber, scheduler, function () {
// Pull the value out of the iterator
const { value, done } = iterator.next();
if (done) {
// If it is "done" we just complete. This mimics the
// behavior of JavaScript's `for..of` consumption of
Expand All @@ -34,13 +44,12 @@ export function scheduleIterable<T>(input: Iterable<T>, scheduler: SchedulerLike
} else {
// The iterable is not done, emit the value.
subscriber.next(value);
// Reschedule. This will cause this function to be
// called again on the same scheduled delay.
this.schedule();
}
});
})
);
},
0,
true
);
});

// During teardown, if we see this iterator has a `return` method,
// then we know it is a Generator, and not just an Iterator. So we call
Expand Down
21 changes: 5 additions & 16 deletions src/internal/scheduled/scheduleObservable.ts
@@ -1,19 +1,8 @@
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { observable as Symbol_observable } from '../symbol/observable';
import { InteropObservable, SchedulerLike, Subscribable } from '../types';
import { innerFrom } from '../observable/from';
import { observeOn } from '../operators/observeOn';
import { subscribeOn } from '../operators/subscribeOn';
import { InteropObservable, SchedulerLike } from '../types';

export function scheduleObservable<T>(input: InteropObservable<T>, scheduler: SchedulerLike) {
return new Observable<T>(subscriber => {
const sub = new Subscription();
sub.add(scheduler.schedule(() => {
const observable: Subscribable<T> = (input as any)[Symbol_observable]();
sub.add(observable.subscribe({
next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); },
error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); },
complete() { sub.add(scheduler.schedule(() => subscriber.complete())); },
}));
}));
return sub;
});
return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler));
}
22 changes: 4 additions & 18 deletions src/internal/scheduled/schedulePromise.ts
@@ -1,22 +1,8 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { observeOn } from '../operators/observeOn';
import { subscribeOn } from '../operators/subscribeOn';
import { SchedulerLike } from '../types';

export function schedulePromise<T>(input: PromiseLike<T>, scheduler: SchedulerLike) {
return new Observable<T>((subscriber) => {
return scheduler.schedule(() =>
input.then(
(value) => {
subscriber.add(
scheduler.schedule(() => {
subscriber.next(value);
subscriber.add(scheduler.schedule(() => subscriber.complete()));
})
);
},
(err) => {
subscriber.add(scheduler.schedule(() => subscriber.error(err)));
}
)
);
});
return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler));
}
20 changes: 0 additions & 20 deletions src/internal/util/caughtSchedule.ts

This file was deleted.

0 comments on commit 6526f14

Please sign in to comment.