Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cursor to use fetchBatch function when current batch is empty #4093

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
418 changes: 170 additions & 248 deletions src/cursor/abstract_cursor.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Sort } from '../sort';
import type { MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { AbstractCursor } from './abstract_cursor';

/** @public */
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
Expand Down Expand Up @@ -101,7 +101,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
addStage(stage: Document): this;
addStage<T = Document>(stage: Document): AggregationCursor<T>;
addStage<T = Document>(stage: Document): AggregationCursor<T> {
assertUninitialized(this);
this.throwIfInitialized();
this[kPipeline].push(stage);
return this as unknown as AggregationCursor<T>;
}
Expand Down
34 changes: 17 additions & 17 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { Hint } from '../operations/operation';
import type { ClientSession } from '../sessions';
import { formatSort, type Sort, type SortDirection } from '../sort';
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { AbstractCursor } from './abstract_cursor';

/** @internal */
const kFilter = Symbol('filter');
Expand Down Expand Up @@ -163,7 +163,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {

/** Set the cursor query */
filter(filter: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kFilter] = filter;
return this;
}
Expand All @@ -174,7 +174,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param hint - If specified, then the query system will only consider plans using the hinted index.
*/
hint(hint: Hint): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].hint = hint;
return this;
}
Expand All @@ -185,7 +185,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
*/
min(min: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].min = min;
return this;
}
Expand All @@ -196,7 +196,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
*/
max(max: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].max = max;
return this;
}
Expand All @@ -209,7 +209,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - the returnKey value.
*/
returnKey(value: boolean): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].returnKey = value;
return this;
}
Expand All @@ -220,7 +220,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
*/
showRecordId(value: boolean): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].showRecordId = value;
return this;
}
Expand All @@ -232,7 +232,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The modifier value.
*/
addQueryModifier(name: string, value: string | boolean | number | Document): this {
assertUninitialized(this);
this.throwIfInitialized();
if (name[0] !== '$') {
throw new MongoInvalidArgumentError(`${name} is not a valid query modifier`);
}
Expand Down Expand Up @@ -295,7 +295,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The comment attached to this query.
*/
comment(value: string): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].comment = value;
return this;
}
Expand All @@ -306,7 +306,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - Number of milliseconds to wait before aborting the tailed query.
*/
maxAwaitTimeMS(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (typeof value !== 'number') {
throw new MongoInvalidArgumentError('Argument for maxAwaitTimeMS must be a number');
}
Expand All @@ -321,7 +321,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - Number of milliseconds to wait before aborting the query.
*/
override maxTimeMS(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (typeof value !== 'number') {
throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
}
Expand Down Expand Up @@ -371,7 +371,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* ```
*/
project<T extends Document = Document>(value: Document): FindCursor<T> {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].projection = value;
return this as unknown as FindCursor<T>;
}
Expand All @@ -383,7 +383,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param direction - The direction of the sorting (1 or -1).
*/
sort(sort: Sort | string, direction?: SortDirection): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support sorting');
}
Expand All @@ -399,7 +399,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* {@link https://www.mongodb.com/docs/manual/reference/command/find/#find-cmd-allowdiskuse | find command allowDiskUse documentation}
*/
allowDiskUse(allow = true): this {
assertUninitialized(this);
this.throwIfInitialized();

if (!this[kBuiltOptions].sort) {
throw new MongoInvalidArgumentError('Option "allowDiskUse" requires a sort specification');
Expand All @@ -421,7 +421,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
*/
collation(value: CollationOptions): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].collation = value;
return this;
}
Expand All @@ -432,7 +432,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The limit for the cursor query.
*/
limit(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support limit');
}
Expand All @@ -451,7 +451,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The skip for the cursor query.
*/
skip(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support skip');
}
Expand Down
6 changes: 5 additions & 1 deletion src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,11 @@ export function maybeClearPinnedConnection(
const servers = Array.from(topology.s.servers.values());
const loadBalancer = servers[0];

if (options?.error == null || options?.force) {
if (
options?.error == null ||
options?.error?.name === 'MongoExpiredSessionError' ||
options?.force
) {
Comment on lines +526 to +530
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, endSession is sensitive to the exact order of endSession calls that were made by our cursor before this refactor.

loadBalancer.pool.checkIn(conn);
session[kPinnedConnection] = undefined;
conn.emit(
Expand Down
5 changes: 2 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { PassThrough } from 'stream';
import { setTimeout } from 'timers';

import {
AbstractCursor,
type ChangeStream,
type ChangeStreamOptions,
type Collection,
Expand All @@ -33,9 +32,9 @@ import {
import { delay, filterForCommands } from '../shared';

const initIteratorMode = async (cs: ChangeStream) => {
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await cs.cursor[kInit]();
//@ts-expect-error: private method
await cs.cursor.cursorInit();
await initEvent;
return;
};
Expand Down
6 changes: 2 additions & 4 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as sinon from 'sinon';
import { setTimeout } from 'timers';

import {
AbstractCursor,
type ChangeStream,
type CommandFailedEvent,
type CommandStartedEvent,
Expand All @@ -18,7 +17,6 @@ import {
Timestamp
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { getSymbolFrom } from '../../tools/utils';
import { setupDatabase } from '../shared';

/**
Expand Down Expand Up @@ -72,9 +70,9 @@ function triggerResumableError(
}

const initIteratorMode = async (cs: ChangeStream) => {
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await cs.cursor[kInit]();
//@ts-expect-error: private method
await cs.cursor.cursorInit();
await initEvent;
return;
};
Expand Down
2 changes: 1 addition & 1 deletion test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ describe('Cursor', function () {
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

await client.close();
expect(cursor).to.have.property('killed', true);
expect(cursor).to.have.property('closed', true);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
Expand Down
12 changes: 12 additions & 0 deletions test/integration/node-specific/cursor_async_iterator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ describe('Cursor Async Iterator Tests', function () {
}
});

it('should not iterate if closed immediately', async function () {
const cursor = collection.find();
await cursor.close();

let count = 0;
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) count++;

expect(count).to.equal(0);
expect(cursor.closed).to.be.true;
});

it('should properly stop when cursor is closed', async function () {
const cursor = collection.find();

Expand Down
7 changes: 4 additions & 3 deletions test/tools/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AssertionError, expect } from 'chai';

import {
AbstractCursor,
type ChangeStream,
Collection,
CommandStartedEvent,
Db,
Expand Down Expand Up @@ -240,9 +241,9 @@ operations.set('createChangeStream', async ({ entities, operation }) => {
}

const { pipeline, ...args } = operation.arguments!;
const changeStream = watchable.watch(pipeline, args);
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
await changeStream.cursor[kInit]();
const changeStream: ChangeStream = watchable.watch(pipeline, args);
//@ts-expect-error: private method
await changeStream.cursor.cursorInit();
return changeStream;
});

Expand Down