Skip to content

Commit

Permalink
Merge pull request #12121 from Automattic/vkarpov15/gh-11936
Browse files Browse the repository at this point in the history
feat(model): add `hydrate` option to `Model.watch()` to automatically hydrate `fullDocument`
  • Loading branch information
vkarpov15 committed Jul 26, 2022
2 parents 281836d + 901f996 commit f62cf52
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
40 changes: 39 additions & 1 deletion lib/cursor/ChangeStream.js
Expand Up @@ -20,6 +20,13 @@ class ChangeStream extends EventEmitter {
this.pipeline = pipeline;
this.options = options;

if (options && options.hydrate && !options.model) {
throw new Error(
'Cannot create change stream with `hydrate: true` ' +
'unless calling `Model.watch()`'
);
}

// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
Expand All @@ -46,7 +53,12 @@ class ChangeStream extends EventEmitter {
});

['close', 'change', 'end', 'error'].forEach(ev => {
this.driverChangeStream.on(ev, data => this.emit(ev, data));
this.driverChangeStream.on(ev, data => {
if (data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
});
});
});

Expand All @@ -69,6 +81,32 @@ class ChangeStream extends EventEmitter {
}

next(cb) {
if (this.options && this.options.hydrate) {
if (cb != null) {
const originalCb = cb;
cb = (err, data) => {
if (err != null) {
return originalCb(err);
}
if (data.fullDocument != null) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
return originalCb(null, data);
};
}

let maybePromise = this.driverChangeStream.next(cb);
if (maybePromise && typeof maybePromise.then === 'function') {
maybePromise = maybePromise.then(data => {
if (data.fullDocument != null) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
return data;
});
}
return maybePromise;
}

return this.driverChangeStream.next(cb);
}

Expand Down
4 changes: 4 additions & 0 deletions lib/model.js
Expand Up @@ -3177,6 +3177,7 @@ Model.create = function create(doc, options, callback) {
*
* @param {Array} [pipeline]
* @param {Object} [options] see the [mongodb driver options](https://mongodb.github.io/node-mongodb-native/3.0/api/Collection.html#watch)
* @param {Boolean} [options.hydrate=false] if true and `fullDocument: 'updateLookup'` is set, Mongoose will automatically hydrate `fullDocument` into a fully fledged Mongoose document
* @return {ChangeStream} mongoose-specific change stream wrapper, inherits from EventEmitter
* @api public
*/
Expand All @@ -3201,6 +3202,9 @@ Model.watch = function(pipeline, options) {
}
};

options = options || {};
options.model = this;

return new ChangeStream(changeStreamThunk, pipeline, options);
};

Expand Down
21 changes: 21 additions & 0 deletions test/model.test.js
Expand Up @@ -5197,6 +5197,27 @@ describe('Model', function() {
doc._id.toHexString());
});

it('fullDocument (gh-11936)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const changeStream = await MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
});

const doc = await MyModel.create({ name: 'Ned Stark' });

const p = changeStream.next();
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
assert.equal(changeData.operationType, 'update');
assert.equal(changeData.fullDocument._id.toHexString(),
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');
});

it('respects discriminators (gh-11007)', async function() {
const BaseModel = db.model('Test', new Schema({ name: String }));
const ChildModel = BaseModel.discriminator('Test1', new Schema({ email: String }));
Expand Down
2 changes: 1 addition & 1 deletion types/models.d.ts
Expand Up @@ -289,7 +289,7 @@ declare module 'mongoose' {
validate(optional: any, pathsToValidate: PathsToValidate, callback?: CallbackWithoutResult): Promise<void>;

/** Watches the underlying collection for changes using [MongoDB change streams](https://docs.mongodb.com/manual/changeStreams/). */
watch<ResultType extends mongodb.Document = any>(pipeline?: Array<Record<string, unknown>>, options?: mongodb.ChangeStreamOptions): mongodb.ChangeStream<ResultType>;
watch<ResultType extends mongodb.Document = any>(pipeline?: Array<Record<string, unknown>>, options?: mongodb.ChangeStreamOptions & { hydrate?: boolean }): mongodb.ChangeStream<ResultType>;

/** Adds a `$where` clause to this query */
$where(argument: string | Function): QueryWithHelpers<Array<HydratedDocument<T, TMethodsAndOverrides, TVirtuals>>, HydratedDocument<T, TMethodsAndOverrides, TVirtuals>, TQueryHelpers, T>;
Expand Down

0 comments on commit f62cf52

Please sign in to comment.