From f8647fa3a9acd9beebf66106d92f99af57aa3481 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 27 Jul 2022 15:40:13 -0400 Subject: [PATCH] wip --- src/cursor/abstract_cursor.ts | 38 +-- src/operations/command.ts | 1 + src/operations/get_more.ts | 6 +- .../change-streams/change_stream.test.ts | 88 +++++- test/integration/crud/maxTimeMS.test.ts | 252 ++++++++++++++++++ test/integration/crud/maxtimems.test.js | 120 --------- 6 files changed, 361 insertions(+), 144 deletions(-) create mode 100644 test/integration/crud/maxTimeMS.test.ts delete mode 100644 test/integration/crud/maxtimems.test.js diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index bc41499b50e..d5a1506bb7b 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -79,7 +79,22 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { readPreference?: ReadPreferenceLike; readConcern?: ReadConcernLike; batchSize?: number; + /** + * For **`tailable=false` cursor** OR **`tailable=true && awaitData=false` cursor**, + * - the driver MUST set `maxTimeMS` on the `find` command and MUST NOT set `maxTimeMS` on the `getMore` command. + * - If `maxTimeMS` is not set in options, the driver SHOULD refrain from setting **maxTimeMS** + * + * For **`tailable=true && awaitData=true` cursor** + * - the driver MUST provide a cursor level option named `maxAwaitTimeMS`. + * - The `maxTimeMS` option on the `getMore` command MUST be set to the value of the option `maxAwaitTimeMS`. + * - If no `maxAwaitTimeMS` is specified, the driver MUST not set `maxTimeMS` on the `getMore` command. + * - `maxAwaitTimeMS` option is not set on the `aggregate` command nor `$changeStream` pipeline stage + * + * ## `maxCommitTimeMS` + * Note, this option is an alias for the `maxTimeMS` commitTransaction command option. + */ maxTimeMS?: number; + maxAwaitTimeMS?: number; /** * Comment to apply to the operation. * @@ -155,7 +170,7 @@ export abstract class AbstractCursor< } this[kClient] = client; this[kNamespace] = namespace; - this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230 + this[kDocuments] = []; this[kInitialized] = false; this[kClosed] = false; this[kKilled] = false; @@ -186,6 +201,10 @@ export abstract class AbstractCursor< this[kOptions].maxTimeMS = options.maxTimeMS; } + if (typeof options.maxAwaitTimeMS === 'number') { + this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS; + } + if (options.session instanceof ClientSession) { this[kSession] = options.session; } else { @@ -617,21 +636,8 @@ export abstract class AbstractCursor< /** @internal */ _getMore(batchSize: number, callback: Callback): void { - const cursorId = this[kId]; - const cursorNs = this[kNamespace]; - const server = this[kServer]; - - if (cursorId == null) { - callback(new MongoRuntimeError('Unable to iterate cursor with no id')); - return; - } - - if (server == null) { - callback(new MongoRuntimeError('Unable to iterate cursor without selected server')); - return; - } - - const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, { ...this[kOptions], session: this[kSession], batchSize diff --git a/src/operations/command.ts b/src/operations/command.ts index 57186ab42cd..f5f0d613c91 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -44,6 +44,7 @@ export interface CommandOperationOptions readConcern?: ReadConcernLike; /** Collation */ collation?: CollationOptions; + /** TODO This is probably in the wrong place................. specs only mention this being a thing for createIndex/dropIndex */ maxTimeMS?: number; /** * Comment to apply to the operation. diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 2d999e5ce29..c86c3f36859 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation { cursorId: Long; override options: GetMoreOptions; - constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) { + constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) { super(options); this.options = options; @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation { ); } + if (this.cursorId == null || this.cursorId.isZero()) { + return callback(new MongoRuntimeError('Unable to iterate cursor with no id')); + } + const collection = this.ns.collection; if (collection == null) { // Cursors should have adopted the namespace returned by MongoDB diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index be8c155006f..c289c8e962b 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1111,7 +1111,7 @@ describe('Change Streams', function () { changeStream.next((err, doc) => { expect(err).to.exist; expect(doc).to.not.exist; - expect(err.message).to.equal('ChangeStream is closed'); + expect(err?.message).to.equal('ChangeStream is closed'); changeStream.close(() => client.close(done)); }); }); @@ -1372,23 +1372,97 @@ describe('Change Streams', function () { ) .run(); + UnifiedTestSuiteBuilder.describe('entity.watch() server-side options') + .runOnRequirement({ + topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'], + minServerVersion: '4.4.0' + }) + .createEntities([ + { client: { id: 'client0', observeEvents: ['commandStartedEvent'] } }, + { database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } }, + { collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } } + ]) + .test( + TestBuilder.it('should use maxAwaitTimeMS to send maxTimeMS on getMore commands') + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxAwaitTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + events: [ + { commandStartedEvent: { commandName: 'aggregate' } }, + { commandStartedEvent: { commandName: 'insert' } }, + { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } } + ] + }) + .toJSON() + ) + .test( + TestBuilder.it('should send maxTimeMS on aggregate command') + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + events: [ + { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, + { commandStartedEvent: { commandName: 'insert' } }, + { + commandStartedEvent: { + commandName: 'getMore', + command: { maxTimeMS: { $$exists: false } } + } + } + ] + }) + .toJSON() + ) + .run(); + describe('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; let cs: ChangeStream; + beforeEach(async function () { client = await this.configuration.newClient({ monitorCommands: true }).connect(); db = client.db('db'); collection = await db.createCollection('collection'); }); + afterEach(async function () { await db.dropCollection('collection'); await cs.close(); await client.close(); - client = undefined; - db = undefined; - collection = undefined; }); context('promoteLongs', () => { @@ -1452,7 +1526,7 @@ describe('Change Streams', function () { it('does not send invalid options on the aggregate command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1473,7 +1547,7 @@ describe('Change Streams', function () { it('does not send invalid options on the getMore command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1503,7 +1577,7 @@ describe('ChangeStream resumability', function () { const changeStreamResumeOptions: ChangeStreamOptions = { fullDocument: 'updateLookup', collation: { locale: 'en', maxVariable: 'punct' }, - maxAwaitTimeMS: 20000, + maxAwaitTimeMS: 2000, batchSize: 200 }; diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts new file mode 100644 index 00000000000..bd36cba5c3d --- /dev/null +++ b/test/integration/crud/maxTimeMS.test.ts @@ -0,0 +1,252 @@ +import { expect } from 'chai'; +import { inspect } from 'util'; + +import { + Collection, + CommandStartedEvent, + FindCursor, + MongoClient, + MongoCursorExhaustedError, + MongoServerError +} from '../../../src'; +import { sleep } from '../../tools/utils'; +import { assert as test, setupDatabase } from '../shared'; + +describe('MaxTimeMS', function () { + before(function () { + return setupDatabase(this.configuration); + }); + + it('Should Correctly respect the maxTimeMS property on count', function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 1 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + // Execute a find command + col + .find({ $where: 'sleep(100) || true' }) + .maxTimeMS(50) + .count(function (err) { + test.ok(err != null); + client.close(done); + }); + }); + }); + }); + + it('Should Correctly respect the maxTimeMS property on toArray', { + metadata: { + requires: { + topology: ['single', 'replicaset'] + } + }, + + test: function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms_2'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 1 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + // Execute a find command + col + .find({ $where: 'sleep(100) || true' }) + .maxTimeMS(50) + .toArray(function (err) { + test.ok(err != null); + client.close(done); + }); + }); + }); + } + }); + + it('Should Correctly fail with maxTimeMS error', { + // Add a tag that our runner can trigger on + // in this case we are setting that node needs to be higher than 0.10.X to run + metadata: { + requires: { + topology: ['single', 'replicaset'] + } + }, + + test: function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms_5'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 10 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + db.admin().command( + { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, + function (err, result) { + expect(err).to.not.exist; + test.equal(1, result?.ok); + + col + .find({}) + .maxTimeMS(10) + .toArray(function (err) { + test.ok(err != null); + + db.admin().command( + { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, + function (err, result) { + expect(err).to.not.exist; + test.equal(1, result?.ok); + client.close(done); + } + ); + }); + } + ); + }); + }); + } + }); + + describe('awaitData, tailable, maxTimeMS, and maxAwaitTimeMS on cursors', () => { + const insertedDocs = [{ _id: 1 }]; + let client: MongoClient; + let cappedCollection: Collection<{ _id: number }>; + let cursor: FindCursor<{ _id: number }>; + let events: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + await client + .db() + .dropCollection('cappedAt3') + .catch(() => null); + await sleep(500); + cappedCollection = await client + .db() + .createCollection('cappedAt3', { capped: true, size: 4096, max: 3 }); + cappedCollection.insertMany(insertedDocs); + + events = []; + client.on('commandStarted', event => + ['getMore', 'find'].includes(event.commandName) ? events.push(event) : null + ); + }); + + afterEach(async function () { + events = []; + await cursor?.close(); + await client?.close(); + }); + + const tailableValues = [true, false, undefined]; + const awaitDataValues = [true, false, undefined]; + const maxTimeMSValues = [100, 0, undefined]; + const maxAwaitTimeMSValues = [100, 0, undefined]; + + const tests = tailableValues.flatMap(tailable => + awaitDataValues.flatMap(awaitData => + maxAwaitTimeMSValues.flatMap(maxAwaitTimeMS => + maxTimeMSValues.flatMap(maxTimeMS => { + const awaitDataSet = Boolean(awaitData) === true; + const tailableSet = Boolean(tailable) === true; + const timeIsSetOnGetMore = typeof maxAwaitTimeMS === 'number'; + return [ + { + options: { tailable, awaitData, maxAwaitTimeMS, maxTimeMS }, + outcome: { + // Cannot set 'awaitData' without also setting 'tailable' + isFindError: awaitDataSet && !tailableSet, + // cannot set maxTimeMS on getMore command for a non-awaitData cursor + isGetMoreError: timeIsSetOnGetMore && !awaitDataSet + } + } + ]; + }) + ) + ) + ); + + for (const { options, outcome } of tests) { + let optionsString = inspect(options, { breakLength: Infinity }); + optionsString = optionsString + .slice(2, optionsString.length - 2) + .split('undefined') + .join('omit'); + + it(`should create find cursor with ${optionsString}`, async () => { + cursor = cappedCollection.find({ _id: { $gt: 0 } }, { ...options, batchSize: 1 }); + + const findDocOrError: { _id: number } | Error = await cursor.next().catch(error => error); + + const exhaustedByFind = !!cursor.id?.isZero(); + + const getMoreDocOrError: { _id: number } | Error | null = await cursor + .tryNext() + .catch(error => error); + + expect(events).to.have.length.of.at.least(1); // At least find must be sent + + if (outcome.isFindError) { + expect(findDocOrError).to.be.instanceOf(MongoServerError); + } else { + if (findDocOrError instanceof Error) { + throw findDocOrError; + } + expect(findDocOrError).to.have.property('_id', 1); + + expect(events[0].command).to.be.an('object').that.has.a.property('find'); + const findCommand = events[0].command; + + if (typeof options.maxTimeMS === 'number') { + expect(findCommand).to.have.property('maxTimeMS', options.maxTimeMS); + } else { + expect(findCommand).to.not.have.property('maxTimeMS'); + } + } + + if (outcome.isGetMoreError) { + expect(getMoreDocOrError).to.be.instanceOf(MongoServerError); + } else if (exhaustedByFind) { + expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); + } else { + if (getMoreDocOrError instanceof Error) { + throw getMoreDocOrError; + } + expect(getMoreDocOrError).to.be.null; + + expect(events[1].command).to.be.an('object').that.has.a.property('getMore'); + const getMoreCommand = events[1].command; + + if (typeof options.maxAwaitTimeMS === 'number') { + expect(getMoreCommand).to.have.property('maxTimeMS', options.maxAwaitTimeMS); + } else { + expect(getMoreCommand).to.not.have.property('maxTimeMS'); + } + } + + await cursor.close(); + }); + } + }); +}); diff --git a/test/integration/crud/maxtimems.test.js b/test/integration/crud/maxtimems.test.js deleted file mode 100644 index f7ca1acb4ba..00000000000 --- a/test/integration/crud/maxtimems.test.js +++ /dev/null @@ -1,120 +0,0 @@ -'use strict'; -const { assert: test, setupDatabase } = require('../shared'); -const { expect } = require('chai'); - -describe('MaxTimeMS', function () { - before(function () { - return setupDatabase(this.configuration); - }); - - it('Should Correctly respect the maxtimeMs property on count', function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .count(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - }); - - it('Should Correctly respect the maxtimeMs property on toArray', { - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_2'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .toArray(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - } - }); - - it('Should Correctly fail with maxTimeMS error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_5'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 10 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - - col - .find({}) - .maxTimeMS(10) - .toArray(function (err) { - test.ok(err != null); - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - client.close(done); - } - ); - }); - } - ); - }); - }); - } - }); -});