Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-2843): implement sessions advanceClusterTime method #2920

Merged
merged 5 commits into from Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/sdam/common.ts
Expand Up @@ -66,16 +66,16 @@ export interface ClusterTime {
};
}

/** Shared function to determine clusterTime for a given topology */
export function resolveClusterTime(
topology: Topology | ClientSession,
/** Shared function to determine clusterTime for a given topology or session */
export function _advanceClusterTime(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
entity: Topology | ClientSession,
$clusterTime: ClusterTime
): void {
if (topology.clusterTime == null) {
topology.clusterTime = $clusterTime;
if (entity.clusterTime == null) {
entity.clusterTime = $clusterTime;
} else {
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
topology.clusterTime = $clusterTime;
if ($clusterTime.clusterTime.greaterThan(entity.clusterTime.clusterTime)) {
entity.clusterTime = $clusterTime;
}
}
}
4 changes: 2 additions & 2 deletions src/sdam/topology.ts
Expand Up @@ -28,7 +28,7 @@ import {
ServerType,
ClusterTime,
TimerQueue,
resolveClusterTime,
_advanceClusterTime,
drainTimerQueue,
clearAndRemoveTimerFrom,
STATE_CLOSED,
Expand Down Expand Up @@ -681,7 +681,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
// value of the clusterTime embedded field."
const clusterTime = serverDescription.$clusterTime;
if (clusterTime) {
resolveClusterTime(this, clusterTime);
_advanceClusterTime(this, clusterTime);
}

// If we already know all the information contained in this updated description, then
Expand Down
31 changes: 29 additions & 2 deletions src/sessions.ts
Expand Up @@ -2,7 +2,7 @@ import { PromiseProvider } from './promise_provider';
import { Binary, Long, Timestamp, Document } from './bson';
import { ReadPreference } from './read_preference';
import { isTransactionCommand, TxnState, Transaction, TransactionOptions } from './transactions';
import { resolveClusterTime, ClusterTime } from './sdam/common';
import { _advanceClusterTime, ClusterTime } from './sdam/common';
import { isSharded } from './cmap/wire_protocol/shared';
import {
MongoError,
Expand Down Expand Up @@ -249,6 +249,33 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}
}

/**
* Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
*
* @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
*/
advanceClusterTime(clusterTime: ClusterTime): void {
if (!clusterTime || typeof clusterTime !== 'object') {
throw new MongoInvalidArgumentError('input cluster time must be an object');
}
if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
throw new MongoInvalidArgumentError(
'input cluster time "clusterTime" property must be a valid BSON Timestamp'
);
}
if (
!clusterTime.signature ||
clusterTime.signature.hash?._bsontype !== 'Binary' ||
clusterTime.signature.keyId?._bsontype !== 'Long'
) {
throw new MongoInvalidArgumentError(
'input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId'
);
}

_advanceClusterTime(this, clusterTime);
}

/**
* Used to determine if this session equals another
*
Expand Down Expand Up @@ -886,7 +913,7 @@ export function applySession(

export function updateSessionFromResponse(session: ClientSession, document: Document): void {
if (document.$clusterTime) {
resolveClusterTime(session, document.$clusterTime);
_advanceClusterTime(session, document.$clusterTime);
}

if (document.operationTime && session && session.supports.causalConsistency) {
Expand Down
126 changes: 126 additions & 0 deletions test/unit/core/sessions.test.js
@@ -1,13 +1,22 @@
'use strict';

const mock = require('../../tools/mock');
const BSON = require('bson');
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const { expect } = require('chai');
const { genClusterTime, sessionCleanupHandler } = require('./common');
const { Topology } = require('../../../src/sdam/topology');
const { ServerSessionPool, ServerSession, ClientSession } = require('../../../src/sessions');
const { now } = require('../../../src/utils');

let test = {};

const generateClusterTime = time => {
return {
clusterTime: new BSON.Timestamp(time),
signature: { hash: new BSON.Binary('test'), keyId: new BSON.Long(1) }
};
};

describe('Sessions - unit/core', function () {
describe('ClientSession', function () {
let session;
Expand Down Expand Up @@ -84,6 +93,123 @@ describe('Sessions - unit/core', function () {
}
});
});

describe('advanceClusterTime()', () => {
// TODO: add functional tests to make sure:
// 1) using the method to update session clusterTime results in usable session
// 2) using the method to update session to invalid time results in unusable session but still usable client
beforeEach(() => {
const client = new Topology('localhost:27017', {});
sessionPool = client.s.sessionPool;
session = new ClientSession(client, sessionPool, {});
});
it('should throw an error if the input cluster time is not an object', {
metadata: { requires: { topology: 'single' } },
test: function () {
const invalidInputs = [undefined, null, 3, 'a'];
for (const input of invalidInputs) {
expect(() => session.advanceClusterTime(input)).to.throw(
'input cluster time must be an object'
);
}
}
});

it(
'should throw an error if the input cluster time is missing a valid clusterTime property',
{
metadata: { requires: { topology: 'single' } },
test: function () {
const invalidInputs = Array(5)
.fill(1)
.map(time => generateClusterTime(time));

delete invalidInputs[0].clusterTime;
invalidInputs[1].clusterTime = null;
invalidInputs[2].clusterTime = 5;
invalidInputs[3].clusterTime = 'not a timestamp';
invalidInputs[4].clusterTime = new Date('1');

for (const input of invalidInputs) {
expect(
() => session.advanceClusterTime(input),
`expected to fail on input: ${JSON.stringify(input)}`
).to.throw(
'input cluster time "clusterTime" property must be a valid BSON Timestamp'
);
}
}
}
);

it('should throw an error if the input cluster time is missing a valid signature property', {
metadata: { requires: { topology: 'single' } },
test: function () {
const invalidInputs = Array(9)
.fill(1)
.map(time => generateClusterTime(time));

// null types
delete invalidInputs[0].signature;
delete invalidInputs[1].signature.hash;
delete invalidInputs[2].signature.keyId;
invalidInputs[3].signature.hash = null;
invalidInputs[4].signature.keyId = null;
// invalid non-null types
invalidInputs[5].signature.keyId = 1;
invalidInputs[6].signature.keyId = 'not BSON Long';
invalidInputs[7].signature.hash = 123;
invalidInputs[8].signature.hash = 'not BSON Binary';

for (const input of invalidInputs) {
expect(
() => session.advanceClusterTime(input),
`expected to fail on input: ${JSON.stringify(input)}`
).to.throw(
'input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId'
);
}
}
});

it('should set the session clusterTime to the one provided if the existing session clusterTime is null', () => {
expect(session).property('clusterTime').to.be.undefined;
const validTime = generateClusterTime(100);
session.advanceClusterTime(validTime);
expect(session).property('clusterTime').to.equal(validTime);

session.clusterTime = null;
expect(session).property('clusterTime').to.be.null;
session.advanceClusterTime(validTime);
expect(session).property('clusterTime').to.equal(validTime);
});

it('should set the session clusterTime to the one provided if it is greater than the the existing session clusterTime', () => {
const validInitialTime = generateClusterTime(100);
const validGreaterTime = generateClusterTime(200);

session.advanceClusterTime(validInitialTime);
expect(session).property('clusterTime').to.equal(validInitialTime);

session.advanceClusterTime(validGreaterTime);
expect(session).property('clusterTime').to.equal(validGreaterTime);
});

it('should leave the session clusterTime unchanged if it is less than or equal to the the existing session clusterTime', () => {
const validInitialTime = generateClusterTime(100);
const validEqualTime = generateClusterTime(100);
const validLesserTime = generateClusterTime(50);

session.advanceClusterTime(validInitialTime);
expect(session).property('clusterTime').to.equal(validInitialTime);

session.advanceClusterTime(validEqualTime);
expect(session).property('clusterTime').to.equal(validInitialTime); // the reference check ensures no update happened

session.advanceClusterTime(validLesserTime);
expect(session).property('clusterTime').to.equal(validInitialTime);
});
});
});

describe('ServerSessionPool', function () {
Expand Down