Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(model): add hydrate option to Model.watch() to automatically hydrate fullDocument #12121

Merged
merged 1 commit into from Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 39 additions & 1 deletion lib/cursor/ChangeStream.js
Expand Up @@ -20,6 +20,13 @@ class ChangeStream extends EventEmitter {
this.pipeline = pipeline;
this.options = options;

if (options && options.hydrate && !options.model) {
throw new Error(
'Cannot create change stream with `hydrate: true` ' +
'unless calling `Model.watch()`'
);
}

// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
Expand All @@ -46,7 +53,12 @@ class ChangeStream extends EventEmitter {
});

['close', 'change', 'end', 'error'].forEach(ev => {
this.driverChangeStream.on(ev, data => this.emit(ev, data));
this.driverChangeStream.on(ev, data => {
if (data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
});
});
});

Expand All @@ -69,6 +81,32 @@ class ChangeStream extends EventEmitter {
}

next(cb) {
if (this.options && this.options.hydrate) {
if (cb != null) {
const originalCb = cb;
cb = (err, data) => {
if (err != null) {
return originalCb(err);
}
if (data.fullDocument != null) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
return originalCb(null, data);
};
}

let maybePromise = this.driverChangeStream.next(cb);
if (maybePromise && typeof maybePromise.then === 'function') {
maybePromise = maybePromise.then(data => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart usage of promises right there.

if (data.fullDocument != null) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
return data;
});
}
return maybePromise;
}

return this.driverChangeStream.next(cb);
}

Expand Down
4 changes: 4 additions & 0 deletions lib/model.js
Expand Up @@ -3177,6 +3177,7 @@ Model.create = function create(doc, options, callback) {
*
* @param {Array} [pipeline]
* @param {Object} [options] see the [mongodb driver options](https://mongodb.github.io/node-mongodb-native/3.0/api/Collection.html#watch)
* @param {Boolean} [options.hydrate=false] if true and `fullDocument: 'updateLookup'` is set, Mongoose will automatically hydrate `fullDocument` into a fully fledged Mongoose document
* @return {ChangeStream} mongoose-specific change stream wrapper, inherits from EventEmitter
* @api public
*/
Expand All @@ -3201,6 +3202,9 @@ Model.watch = function(pipeline, options) {
}
};

options = options || {};
options.model = this;

return new ChangeStream(changeStreamThunk, pipeline, options);
};

Expand Down
21 changes: 21 additions & 0 deletions test/model.test.js
Expand Up @@ -5197,6 +5197,27 @@ describe('Model', function() {
doc._id.toHexString());
});

it('fullDocument (gh-11936)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const changeStream = await MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
});

const doc = await MyModel.create({ name: 'Ned Stark' });

const p = changeStream.next();
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
assert.equal(changeData.operationType, 'update');
assert.equal(changeData.fullDocument._id.toHexString(),
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');
});

it('respects discriminators (gh-11007)', async function() {
const BaseModel = db.model('Test', new Schema({ name: String }));
const ChildModel = BaseModel.discriminator('Test1', new Schema({ email: String }));
Expand Down
2 changes: 1 addition & 1 deletion types/models.d.ts
Expand Up @@ -288,7 +288,7 @@ declare module 'mongoose' {
validate(optional: any, pathsToValidate: PathsToValidate, callback?: CallbackWithoutResult): Promise<void>;

/** Watches the underlying collection for changes using [MongoDB change streams](https://docs.mongodb.com/manual/changeStreams/). */
watch<ResultType extends mongodb.Document = any>(pipeline?: Array<Record<string, unknown>>, options?: mongodb.ChangeStreamOptions): mongodb.ChangeStream<ResultType>;
watch<ResultType extends mongodb.Document = any>(pipeline?: Array<Record<string, unknown>>, options?: mongodb.ChangeStreamOptions & { hydrate?: boolean }): mongodb.ChangeStream<ResultType>;

/** Adds a `$where` clause to this query */
$where(argument: string | Function): QueryWithHelpers<Array<HydratedDocument<T, TMethodsAndOverrides, TVirtuals>>, HydratedDocument<T, TMethodsAndOverrides, TVirtuals>, TQueryHelpers, T>;
Expand Down