diff --git a/lib/cursor/ChangeStream.js b/lib/cursor/ChangeStream.js index 15dbcadcbfb..dc1ad500d79 100644 --- a/lib/cursor/ChangeStream.js +++ b/lib/cursor/ChangeStream.js @@ -20,6 +20,13 @@ class ChangeStream extends EventEmitter { this.pipeline = pipeline; this.options = options; + if (options && options.hydrate && !options.model) { + throw new Error( + 'Cannot create change stream with `hydrate: true` ' + + 'unless calling `Model.watch()`' + ); + } + // This wrapper is necessary because of buffering. changeStreamThunk((err, driverChangeStream) => { if (err != null) { @@ -46,7 +53,12 @@ class ChangeStream extends EventEmitter { }); ['close', 'change', 'end', 'error'].forEach(ev => { - this.driverChangeStream.on(ev, data => this.emit(ev, data)); + this.driverChangeStream.on(ev, data => { + if (data.fullDocument != null && this.options && this.options.hydrate) { + data.fullDocument = this.options.model.hydrate(data.fullDocument); + } + this.emit(ev, data); + }); }); }); @@ -69,6 +81,32 @@ class ChangeStream extends EventEmitter { } next(cb) { + if (this.options && this.options.hydrate) { + if (cb != null) { + const originalCb = cb; + cb = (err, data) => { + if (err != null) { + return originalCb(err); + } + if (data.fullDocument != null) { + data.fullDocument = this.options.model.hydrate(data.fullDocument); + } + return originalCb(null, data); + }; + } + + let maybePromise = this.driverChangeStream.next(cb); + if (maybePromise && typeof maybePromise.then === 'function') { + maybePromise = maybePromise.then(data => { + if (data.fullDocument != null) { + data.fullDocument = this.options.model.hydrate(data.fullDocument); + } + return data; + }); + } + return maybePromise; + } + return this.driverChangeStream.next(cb); } diff --git a/lib/model.js b/lib/model.js index 36eb73bbdab..2d3e22ca50c 100644 --- a/lib/model.js +++ b/lib/model.js @@ -3177,6 +3177,7 @@ Model.create = function create(doc, options, callback) { * * @param {Array} [pipeline] * @param {Object} [options] see the [mongodb driver options](https://mongodb.github.io/node-mongodb-native/3.0/api/Collection.html#watch) + * @param {Boolean} [options.hydrate=false] if true and `fullDocument: 'updateLookup'` is set, Mongoose will automatically hydrate `fullDocument` into a fully fledged Mongoose document * @return {ChangeStream} mongoose-specific change stream wrapper, inherits from EventEmitter * @api public */ @@ -3201,6 +3202,9 @@ Model.watch = function(pipeline, options) { } }; + options = options || {}; + options.model = this; + return new ChangeStream(changeStreamThunk, pipeline, options); }; diff --git a/test/model.test.js b/test/model.test.js index 01585f75b9f..8bc69432126 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -5197,6 +5197,27 @@ describe('Model', function() { doc._id.toHexString()); }); + it('fullDocument (gh-11936)', async function() { + const MyModel = db.model('Test', new Schema({ name: String })); + + const changeStream = await MyModel.watch([], { + fullDocument: 'updateLookup', + hydrate: true + }); + + const doc = await MyModel.create({ name: 'Ned Stark' }); + + const p = changeStream.next(); + await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' }); + + const changeData = await p; + assert.equal(changeData.operationType, 'update'); + assert.equal(changeData.fullDocument._id.toHexString(), + doc._id.toHexString()); + assert.ok(changeData.fullDocument.$__); + assert.equal(changeData.fullDocument.get('name'), 'Tony Stark'); + }); + it('respects discriminators (gh-11007)', async function() { const BaseModel = db.model('Test', new Schema({ name: String })); const ChildModel = BaseModel.discriminator('Test1', new Schema({ email: String })); diff --git a/types/models.d.ts b/types/models.d.ts index 9fae6afa66e..93951ccad2e 100644 --- a/types/models.d.ts +++ b/types/models.d.ts @@ -288,7 +288,7 @@ declare module 'mongoose' { validate(optional: any, pathsToValidate: PathsToValidate, callback?: CallbackWithoutResult): Promise; /** Watches the underlying collection for changes using [MongoDB change streams](https://docs.mongodb.com/manual/changeStreams/). */ - watch(pipeline?: Array>, options?: mongodb.ChangeStreamOptions): mongodb.ChangeStream; + watch(pipeline?: Array>, options?: mongodb.ChangeStreamOptions & { hydrate?: boolean }): mongodb.ChangeStream; /** Adds a `$where` clause to this query */ $where(argument: string | Function): QueryWithHelpers>, HydratedDocument, TQueryHelpers, T>;