Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: scheduler unsubscription scenarios #6674

Merged
merged 7 commits into from Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 71 additions & 1 deletion spec/schedulers/AnimationFrameScheduler-spec.ts
Expand Up @@ -43,7 +43,7 @@ describe('Scheduler.animationFrame', () => {
const requestSpy = sinon.spy(animationFrameProvider, 'requestAnimationFrame');
const setSpy = sinon.spy(intervalProvider, 'setInterval');
const clearSpy = sinon.spy(intervalProvider, 'clearInterval');

animate(' ----------x--');
const a = cold(' a ');
const ta = time(' ----| ');
Expand Down Expand Up @@ -168,4 +168,74 @@ describe('Scheduler.animationFrame', () => {
}, 0, 0);
scheduledIndices.push(0);
});

it('should execute actions scheduled when flushing in a subsequent flush', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(animationFrameScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
});
b = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
});
});

it('should execute actions scheduled when flushing in a subsequent flush when some actions are unsubscribed', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(animationFrameScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
b.unsubscribe();
});
b = animationFrameScheduler.schedule(() => {
done(new Error('Unexpected execution of b'));
});
});

it('should properly cancel an unnecessary flush', (done) => {
const sandbox = sinon.createSandbox();
const cancelAnimationFrameStub = sandbox.stub(animationFrameProvider, 'cancelAnimationFrame').callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(animationFrameScheduler.actions).to.have.length(1);
c = animationFrameScheduler.schedule(() => {
done(new Error('Unexpected execution of c'));
});
expect(animationFrameScheduler.actions).to.have.length(2);
// What we're testing here is that the unsubscription of action c effects
// the cancellation of the animation frame in a scenario in which the
// actions queue is not empty - it contains action b.
c.unsubscribe();
expect(animationFrameScheduler.actions).to.have.length(1);
expect(cancelAnimationFrameStub).to.have.callCount(1);
});
b = animationFrameScheduler.schedule(() => {
sandbox.restore();
done();
});
});
});
87 changes: 77 additions & 10 deletions spec/schedulers/AsapScheduler-spec.ts
Expand Up @@ -39,9 +39,10 @@ describe('Scheduler.asap', () => {

it('should cancel asap actions when delay > 0', () => {
testScheduler.run(({ cold, expectObservable, flush, time }) => {
const setImmediateSpy = sinon.spy(immediateProvider, 'setImmediate');
const setSpy = sinon.spy(intervalProvider, 'setInterval');
const clearSpy = sinon.spy(intervalProvider, 'clearInterval');
const sandbox = sinon.createSandbox();
const setImmediateSpy = sandbox.spy(immediateProvider, 'setImmediate');
const setSpy = sandbox.spy(intervalProvider, 'setInterval');
const clearSpy = sandbox.spy(intervalProvider, 'clearInterval');
Comment on lines +42 to +45
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sandbox wasn't used consistently throughout this file, so I added a commit - 0dd8f02 - to address this.


const a = cold(' a ');
const ta = time(' ----| ');
Expand All @@ -57,17 +58,15 @@ describe('Scheduler.asap', () => {
expect(setImmediateSpy).to.have.not.been.called;
expect(setSpy).to.have.been.calledOnce;
expect(clearSpy).to.have.been.calledOnce;
setImmediateSpy.restore();
setSpy.restore();
clearSpy.restore();
sandbox.restore();
});
});

it('should reuse the interval for recursively scheduled actions with the same delay', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
// callThrough is missing from the declarations installed by the typings tool in stable
const stubSetInterval = (<any> sinon.stub(global, 'setInterval')).callThrough();
const stubSetInterval = (<any> sandbox.stub(global, 'setInterval')).callThrough();
const period = 50;
const state = { index: 0, period };
type State = typeof state;
Expand All @@ -86,15 +85,14 @@ describe('Scheduler.asap', () => {
fakeTimer.tick(period);
expect(state).to.have.property('index', 2);
expect(stubSetInterval).to.have.property('callCount', 1);
stubSetInterval.restore();
sandbox.restore();
});

it('should not reuse the interval for recursively scheduled actions with a different delay', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
// callThrough is missing from the declarations installed by the typings tool in stable
const stubSetInterval = (<any> sinon.stub(global, 'setInterval')).callThrough();
const stubSetInterval = (<any> sandbox.stub(global, 'setInterval')).callThrough();
const period = 50;
const state = { index: 0, period };
type State = typeof state;
Expand All @@ -114,7 +112,6 @@ describe('Scheduler.asap', () => {
fakeTimer.tick(period);
expect(state).to.have.property('index', 2);
expect(stubSetInterval).to.have.property('callCount', 3);
stubSetInterval.restore();
sandbox.restore();
});

Expand Down Expand Up @@ -221,4 +218,74 @@ describe('Scheduler.asap', () => {
}, 0, 0);
scheduledIndices.push(0);
});

it('should execute actions scheduled when flushing in a subsequent flush', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(asapScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
});
b = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
});
});

it('should execute actions scheduled when flushing in a subsequent flush when some actions are unsubscribed', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(asapScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
b.unsubscribe();
});
b = asapScheduler.schedule(() => {
done(new Error('Unexpected execution of b'));
});
});

it('should properly cancel an unnecessary flush', (done) => {
const sandbox = sinon.createSandbox();
const clearImmediateStub = sandbox.stub(immediateProvider, 'clearImmediate').callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(asapScheduler.actions).to.have.length(1);
c = asapScheduler.schedule(() => {
done(new Error('Unexpected execution of c'));
});
expect(asapScheduler.actions).to.have.length(2);
// What we're testing here is that the unsubscription of action c effects
// the cancellation of the microtask in a scenario in which the actions
// queue is not empty - it contains action b.
c.unsubscribe();
expect(asapScheduler.actions).to.have.length(1);
expect(clearImmediateStub).to.have.callCount(1);
});
b = asapScheduler.schedule(() => {
sandbox.restore();
done();
});
});
});
8 changes: 4 additions & 4 deletions src/internal/scheduler/AnimationFrameAction.ts
Expand Up @@ -27,10 +27,10 @@ export class AnimationFrameAction<T> extends AsyncAction<T> {
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested animation frame and
// set the scheduled flag to undefined so the next AnimationFrameAction will
// request its own.
if (scheduler.actions.length === 0) {
// If the scheduler queue has no remaining actions with the same async id,
// cancel the requested animation frame and set the scheduled flag to
// undefined so the next AnimationFrameAction will request its own.
if (!scheduler.actions.some((action) => action.id === id)) {
animationFrameProvider.cancelAnimationFrame(id);
scheduler._scheduled = undefined;
}
Expand Down
16 changes: 12 additions & 4 deletions src/internal/scheduler/AnimationFrameScheduler.ts
Expand Up @@ -4,24 +4,32 @@ import { AsyncScheduler } from './AsyncScheduler';
export class AnimationFrameScheduler extends AsyncScheduler {
public flush(action?: AsyncAction<any>): void {
this._active = true;
// The async id that effects a call to flush is stored in _scheduled.
// Before executing an action, it's necessary to check the action's async
// id to determine whether it's supposed to be executed in the current
// flush.
// Previous implementations of this method used a count to determine this,
// but that was unsound, as actions that are unsubscribed - i.e. cancelled -
// are removed from the actions array and that can shift actions that are
// scheduled to be executed in a subsequent flush into positions at which
// they are executed within the current flush.
const flushId = this._scheduled;
this._scheduled = undefined;

const { actions } = this;
let error: any;
let index = -1;
action = action || actions.shift()!;
const count = actions.length;

do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while (++index < count && (action = actions.shift()));
} while ((action = actions[0]) && action.id === flushId && actions.shift());

this._active = false;

if (error) {
while (++index < count && (action = actions.shift())) {
while ((action = actions[0]) && action.id === flushId && actions.shift()) {
action.unsubscribe();
}
throw error;
Expand Down
8 changes: 4 additions & 4 deletions src/internal/scheduler/AsapAction.ts
Expand Up @@ -27,10 +27,10 @@ export class AsapAction<T> extends AsyncAction<T> {
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested microtask and
// set the scheduled flag to undefined so the next AsapAction will schedule
// its own.
if (scheduler.actions.length === 0) {
// If the scheduler queue has no remaining actions with the same async id,
// cancel the requested microtask and set the scheduled flag to undefined
// so the next AsapAction will request its own.
if (!scheduler.actions.some((action) => action.id === id)) {
immediateProvider.clearImmediate(id);
scheduler._scheduled = undefined;
}
Expand Down
16 changes: 12 additions & 4 deletions src/internal/scheduler/AsapScheduler.ts
Expand Up @@ -4,24 +4,32 @@ import { AsyncScheduler } from './AsyncScheduler';
export class AsapScheduler extends AsyncScheduler {
public flush(action?: AsyncAction<any>): void {
this._active = true;
// The async id that effects a call to flush is stored in _scheduled.
// Before executing an action, it's necessary to check the action's async
// id to determine whether it's supposed to be executed in the current
// flush.
// Previous implementations of this method used a count to determine this,
// but that was unsound, as actions that are unsubscribed - i.e. cancelled -
// are removed from the actions array and that can shift actions that are
// scheduled to be executed in a subsequent flush into positions at which
// they are executed within the current flush.
const flushId = this._scheduled;
this._scheduled = undefined;

const { actions } = this;
let error: any;
let index = -1;
action = action || actions.shift()!;
const count = actions.length;

do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while (++index < count && (action = actions.shift()));
} while ((action = actions[0]) && action.id === flushId && actions.shift());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really feels like we should clean up this loop, but for now I think this change is solid.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No arguments from me re: clean up.


this._active = false;

if (error) {
while (++index < count && (action = actions.shift())) {
while ((action = actions[0]) && action.id === flushId && actions.shift()) {
action.unsubscribe();
}
throw error;
Expand Down