Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jul 27, 2022
1 parent 0754bf9 commit f8647fa
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 144 deletions.
38 changes: 22 additions & 16 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -79,7 +79,22 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
readPreference?: ReadPreferenceLike;
readConcern?: ReadConcernLike;
batchSize?: number;
/**
* For **`tailable=false` cursor** OR **`tailable=true && awaitData=false` cursor**,
* - the driver MUST set `maxTimeMS` on the `find` command and MUST NOT set `maxTimeMS` on the `getMore` command.
* - If `maxTimeMS` is not set in options, the driver SHOULD refrain from setting **maxTimeMS**
*
* For **`tailable=true && awaitData=true` cursor**
* - the driver MUST provide a cursor level option named `maxAwaitTimeMS`.
* - The `maxTimeMS` option on the `getMore` command MUST be set to the value of the option `maxAwaitTimeMS`.
* - If no `maxAwaitTimeMS` is specified, the driver MUST not set `maxTimeMS` on the `getMore` command.
* - `maxAwaitTimeMS` option is not set on the `aggregate` command nor `$changeStream` pipeline stage
*
* ## `maxCommitTimeMS`
* Note, this option is an alias for the `maxTimeMS` commitTransaction command option.
*/
maxTimeMS?: number;
maxAwaitTimeMS?: number;
/**
* Comment to apply to the operation.
*
Expand Down Expand Up @@ -155,7 +170,7 @@ export abstract class AbstractCursor<
}
this[kClient] = client;
this[kNamespace] = namespace;
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
this[kDocuments] = [];
this[kInitialized] = false;
this[kClosed] = false;
this[kKilled] = false;
Expand Down Expand Up @@ -186,6 +201,10 @@ export abstract class AbstractCursor<
this[kOptions].maxTimeMS = options.maxTimeMS;
}

if (typeof options.maxAwaitTimeMS === 'number') {
this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
}

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
Expand Down Expand Up @@ -617,21 +636,8 @@ export abstract class AbstractCursor<

/** @internal */
_getMore(batchSize: number, callback: Callback<Document>): void {
const cursorId = this[kId];
const cursorNs = this[kNamespace];
const server = this[kServer];

if (cursorId == null) {
callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
return;
}

if (server == null) {
callback(new MongoRuntimeError('Unable to iterate cursor without selected server'));
return;
}

const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
...this[kOptions],
session: this[kSession],
batchSize
Expand Down
1 change: 1 addition & 0 deletions src/operations/command.ts
Expand Up @@ -44,6 +44,7 @@ export interface CommandOperationOptions
readConcern?: ReadConcernLike;
/** Collation */
collation?: CollationOptions;
/** TODO This is probably in the wrong place................. specs only mention this being a thing for createIndex/dropIndex */
maxTimeMS?: number;
/**
* Comment to apply to the operation.
Expand Down
6 changes: 5 additions & 1 deletion src/operations/get_more.ts
Expand Up @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
override options: GetMoreOptions;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
super(options);

this.options = options;
Expand All @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation {
);
}

if (this.cursorId == null || this.cursorId.isZero()) {
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
}

const collection = this.ns.collection;
if (collection == null) {
// Cursors should have adopted the namespace returned by MongoDB
Expand Down
88 changes: 81 additions & 7 deletions test/integration/change-streams/change_stream.test.ts
Expand Up @@ -1111,7 +1111,7 @@ describe('Change Streams', function () {
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err.message).to.equal('ChangeStream is closed');
expect(err?.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
Expand Down Expand Up @@ -1372,23 +1372,97 @@ describe('Change Streams', function () {
)
.run();

UnifiedTestSuiteBuilder.describe('entity.watch() server-side options')
.runOnRequirement({
topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'],
minServerVersion: '4.4.0'
})
.createEntities([
{ client: { id: 'client0', observeEvents: ['commandStartedEvent'] } },
{ database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } },
{ collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } }
])
.test(
TestBuilder.it('should use maxAwaitTimeMS to send maxTimeMS on getMore commands')
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxAwaitTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
events: [
{ commandStartedEvent: { commandName: 'aggregate' } },
{ commandStartedEvent: { commandName: 'insert' } },
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
]
})
.toJSON()
)
.test(
TestBuilder.it('should send maxTimeMS on aggregate command')
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
events: [
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
{ commandStartedEvent: { commandName: 'insert' } },
{
commandStartedEvent: {
commandName: 'getMore',
command: { maxTimeMS: { $$exists: false } }
}
}
]
})
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;

beforeEach(async function () {
client = await this.configuration.newClient({ monitorCommands: true }).connect();
db = client.db('db');
collection = await db.createCollection('collection');
});

afterEach(async function () {
await db.dropCollection('collection');
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('promoteLongs', () => {
Expand Down Expand Up @@ -1452,7 +1526,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the aggregate command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand All @@ -1473,7 +1547,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand Down Expand Up @@ -1503,7 +1577,7 @@ describe('ChangeStream resumability', function () {
const changeStreamResumeOptions: ChangeStreamOptions = {
fullDocument: 'updateLookup',
collation: { locale: 'en', maxVariable: 'punct' },
maxAwaitTimeMS: 20000,
maxAwaitTimeMS: 2000,
batchSize: 200
};

Expand Down

0 comments on commit f8647fa

Please sign in to comment.