Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6057): implement CursorResponse for lazy document parsing #4085

Merged
merged 25 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c5b8d78
refactor(NODE-6057): implement CursorResponse for lazy document parsing
nbbeeken Apr 17, 2024
0e22782
chore: remove the need for unshift
nbbeeken Apr 17, 2024
3144b11
chore: lint
nbbeeken Apr 17, 2024
78f9068
chore: fix FLE, may revert.
nbbeeken Apr 17, 2024
f42584c
chore: fix ts
nbbeeken Apr 17, 2024
8bd2a47
fix: old servers
nbbeeken Apr 18, 2024
68077e9
fix: return consistent type
nbbeeken Apr 18, 2024
5fb54d3
fix: unit
nbbeeken Apr 18, 2024
df5121a
perf: define methods on cursor response
nbbeeken Apr 19, 2024
053f01f
refactor: replace isBuffer check with isUint8Array
nbbeeken Apr 23, 2024
7edb948
fix: add correct ts to execute
nbbeeken Apr 23, 2024
4631e34
docs: add comment for base64 string
nbbeeken Apr 23, 2024
ac5118e
feat: remove valuesAs just use indexing
nbbeeken Apr 23, 2024
9f80b5b
feat: required fields by moving error check out of constructor
nbbeeken Apr 23, 2024
0902172
fix: ns is not a required field
nbbeeken Apr 23, 2024
ea365c1
fix: rename hasNext
nbbeeken Apr 23, 2024
24b7678
fix: shift: false wrong for stream, stream uses same settings as `cur…
nbbeeken Apr 23, 2024
195bd62
fix: stream cannot use cursor's next because it reaches exhaustion er…
nbbeeken Apr 23, 2024
2f10f0e
test: add unit tests for cursor response ctor
nbbeeken Apr 23, 2024
ca88168
fix: batch always exists and emptyGetMore made simpler
nbbeeken Apr 23, 2024
0533414
test: demonstrate internal decorate feature breakage
nbbeeken Apr 25, 2024
3ee316d
fix: revert changes to FLE and exempt from using responseType
nbbeeken Apr 25, 2024
c0b0a86
test: fixes
nbbeeken Apr 25, 2024
26be067
test: lower server version for decorateDecryptionResult test
nbbeeken Apr 29, 2024
0d801a6
fix: remove encryption flag from find
nbbeeken Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { type CompressorName, decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { MongoDBResponse, type MongoDBResponseConstructor } from './wire_protocol/responses';
import {
isErrorResponse,
MongoDBResponse,
type MongoDBResponseConstructor
} from './wire_protocol/responses';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -443,7 +447,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.socket.setTimeout(0);
const bson = response.parse();

const document = new (responseType ?? MongoDBResponse)(bson, 0, false);
const document =
responseType == null
? new MongoDBResponse(bson)
: isErrorResponse(bson)
? new MongoDBResponse(bson)
: new responseType(bson);

yield document;
this.throwIfAborted();
Expand Down Expand Up @@ -739,7 +748,7 @@ export class CryptoConnection extends Connection {
ns: MongoDBNamespace,
cmd: Document,
options?: CommandOptions,
responseType?: T | undefined
_responseType?: T | undefined
): Promise<Document> {
const { autoEncrypter } = this;
if (!autoEncrypter) {
Expand All @@ -753,7 +762,7 @@ export class CryptoConnection extends Connection {
const serverWireVersion = maxWireVersion(this);
if (serverWireVersion === 0) {
// This means the initial handshake hasn't happened yet
return await super.command<T>(ns, cmd, options, responseType);
return await super.command<T>(ns, cmd, options, undefined);
}

if (serverWireVersion < 8) {
Expand Down Expand Up @@ -787,7 +796,7 @@ export class CryptoConnection extends Connection {
}
}

const response = await super.command<T>(ns, encrypted, options, responseType);
const response = await super.command<T>(ns, encrypted, options, undefined);

return await autoEncrypter.decrypt(response, options);
}
Expand Down
57 changes: 36 additions & 21 deletions src/cmap/wire_protocol/on_demand/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class OnDemandDocument {
private readonly indexFound: Record<number, boolean> = Object.create(null);

/** All bson elements in this document */
private readonly elements: BSONElement[];
private readonly elements: ReadonlyArray<BSONElement>;

constructor(
/** BSON bytes, this document begins at offset */
Expand Down Expand Up @@ -97,14 +97,30 @@ export class OnDemandDocument {
* @param name - a basic latin string name of a BSON element
* @returns
*/
private getElement(name: string): CachedBSONElement | null {
private getElement(name: string | number): CachedBSONElement | null {
const cachedElement = this.cache[name];
if (cachedElement === false) return null;

if (cachedElement != null) {
return cachedElement;
}

if (typeof name === 'number') {
if (this.isArray) {
if (name < this.elements.length) {
const element = this.elements[name];
const cachedElement = { element, value: undefined };
this.cache[name] = cachedElement;
this.indexFound[name] = true;
return cachedElement;
} else {
return null;
}
} else {
return null;
}
}

for (let index = 0; index < this.elements.length; index++) {
const element = this.elements[index];

Expand Down Expand Up @@ -197,6 +213,13 @@ export class OnDemandDocument {
}
}

/**
* Returns the number of elements in this BSON document
*/
public size() {
return this.elements.length;
}

/**
* Checks for the existence of an element by name.
*
Expand All @@ -222,16 +245,20 @@ export class OnDemandDocument {
* @param required - whether or not the element is expected to exist, if true this function will throw if it is not present
*/
public get<const T extends keyof JSTypeOf>(
name: string,
name: string | number,
as: T,
required?: false | undefined
): JSTypeOf[T] | null;

/** `required` will make `get` throw if name does not exist or is null/undefined */
public get<const T extends keyof JSTypeOf>(name: string, as: T, required: true): JSTypeOf[T];
public get<const T extends keyof JSTypeOf>(
name: string | number,
as: T,
required: true
): JSTypeOf[T];

public get<const T extends keyof JSTypeOf>(
name: string,
name: string | number,
as: T,
required?: boolean
): JSTypeOf[T] | null {
Expand Down Expand Up @@ -303,21 +330,9 @@ export class OnDemandDocument {
});
}

/**
* Iterates through the elements of a document reviving them using the `as` BSONType.
*
* @param as - The type to revive all elements as
*/
public *valuesAs<const T extends keyof JSTypeOf>(as: T): Generator<JSTypeOf[T]> {
if (!this.isArray) {
throw new BSONError('Unexpected conversion of non-array value to array');
}
let counter = 0;
for (const element of this.elements) {
const value = this.toJSValue<T>(element, as);
this.cache[counter] = { element, value };
yield value;
counter += 1;
}
/** Returns this document's bytes only */
toBytes() {
const size = getInt32LE(this.bson, this.offset);
return this.bson.subarray(this.offset, this.offset + size);
}
}
150 changes: 139 additions & 11 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,73 @@
import { type BSONSerializeOptions, BSONType, type Document, type Timestamp } from '../../bson';
import {
type BSONSerializeOptions,
BSONType,
type Document,
Long,
parseToElementsToArray,
type Timestamp
} from '../../bson';
import { MongoUnexpectedServerResponseError } from '../../error';
import { type ClusterTime } from '../../sdam/common';
import { type MongoDBNamespace, ns } from '../../utils';
import { OnDemandDocument } from './on_demand/document';

// eslint-disable-next-line no-restricted-syntax
const enum BSONElementOffset {
type = 0,
nameOffset = 1,
nameLength = 2,
offset = 3,
length = 4
}
/**
* Accepts a BSON payload and checks for na "ok: 0" element.
* This utility is intended to prevent calling response class constructors
* that expect the result to be a success and demand certain properties to exist.
*
* For example, a cursor response always expects a cursor embedded document.
* In order to write the class such that the properties reflect that assertion (non-null)
* we cannot invoke the subclass constructor if the BSON represents an error.
*
* @param bytes - BSON document returned from the server
*/
export function isErrorResponse(bson: Uint8Array): boolean {
const elements = parseToElementsToArray(bson, 0);
for (let eIdx = 0; eIdx < elements.length; eIdx++) {
const element = elements[eIdx];

if (element[BSONElementOffset.nameLength] === 2) {
const nameOffset = element[BSONElementOffset.nameOffset];

// 111 == "o", 107 == "k"
if (bson[nameOffset] === 111 && bson[nameOffset + 1] === 107) {
const valueOffset = element[BSONElementOffset.offset];
const valueLength = element[BSONElementOffset.length];

// If any byte in the length of the ok number (works for any type) is non zero,
// then it is considered "ok: 1"
for (let i = valueOffset; i < valueOffset + valueLength; i++) {
if (bson[i] !== 0x00) return false;
}

return true;
}
}
}

return true;
}

/** @internal */
export type MongoDBResponseConstructor = {
new (bson: Uint8Array, offset?: number, isArray?: boolean): MongoDBResponse;
};

/** @internal */
export class MongoDBResponse extends OnDemandDocument {
static is(value: unknown): value is MongoDBResponse {
return value instanceof MongoDBResponse;
}

// {ok:1}
static empty = new MongoDBResponse(new Uint8Array([13, 0, 0, 0, 16, 111, 107, 0, 1, 0, 0, 0, 0]));

Expand Down Expand Up @@ -83,27 +142,96 @@ export class MongoDBResponse extends OnDemandDocument {
return this.clusterTime ?? null;
}

public override toObject(options: BSONSerializeOptions = {}): Record<string, any> {
public override toObject(options?: BSONSerializeOptions): Record<string, any> {
const exactBSONOptions = {
useBigInt64: options.useBigInt64,
promoteLongs: options.promoteLongs,
promoteValues: options.promoteValues,
promoteBuffers: options.promoteBuffers,
bsonRegExp: options.bsonRegExp,
raw: options.raw ?? false,
fieldsAsRaw: options.fieldsAsRaw ?? {},
useBigInt64: options?.useBigInt64,
promoteLongs: options?.promoteLongs,
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
promoteValues: options?.promoteValues,
promoteBuffers: options?.promoteBuffers,
bsonRegExp: options?.bsonRegExp,
raw: options?.raw ?? false,
fieldsAsRaw: options?.fieldsAsRaw ?? {},
validation: this.parseBsonSerializationOptions(options)
};
return super.toObject(exactBSONOptions);
}

private parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): {
private parseBsonSerializationOptions(options?: { enableUtf8Validation?: boolean }): {
utf8: { writeErrors: false } | false;
} {
const enableUtf8Validation = options?.enableUtf8Validation;
if (enableUtf8Validation === false) {
return { utf8: false };
}

return { utf8: { writeErrors: false } };
}
}

/** @internal */
export class CursorResponse extends MongoDBResponse {
/**
* This supports a feature of the FindCursor.
* It is an optimization to avoid an extra getMore when the limit has been reached
*/
static emptyGetMore = { id: new Long(0), length: 0, shift: () => null };

static override is(value: unknown): value is CursorResponse {
return value instanceof CursorResponse || value === CursorResponse.emptyGetMore;
}

public id: Long;
public ns: MongoDBNamespace | null = null;
public batchSize = 0;

private batch: OnDemandDocument;
private iterated = 0;

constructor(bytes: Uint8Array, offset?: number, isArray?: boolean) {
super(bytes, offset, isArray);

const cursor = this.get('cursor', BSONType.object, true);

const id = cursor.get('id', BSONType.long, true);
this.id = new Long(Number(id & 0xffff_ffffn), Number((id >> 32n) & 0xffff_ffffn));

baileympearson marked this conversation as resolved.
Show resolved Hide resolved
const namespace = cursor.get('ns', BSONType.string);
if (namespace != null) this.ns = ns(namespace);

if (cursor.has('firstBatch')) this.batch = cursor.get('firstBatch', BSONType.array, true);
else if (cursor.has('nextBatch')) this.batch = cursor.get('nextBatch', BSONType.array, true);
else throw new MongoUnexpectedServerResponseError('Cursor document did not contain a batch');

nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
this.batchSize = this.batch.size();
}

get length() {
return Math.max(this.batchSize - this.iterated, 0);
}
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

shift(options?: BSONSerializeOptions): any {
if (this.iterated >= this.batchSize) {
return null;
}

const result = this.batch.get(this.iterated, BSONType.object, true) ?? null;
this.iterated += 1;

if (options?.raw) {
return result.toBytes();
} else {
return result.toObject(options);
}
}

clear() {
this.iterated = this.batchSize;
}

pushMany() {
throw new Error('pushMany Unsupported method');
}

push() {
throw new Error('push Unsupported method');
}
}