Skip to content

Commit

Permalink
feat(connection): add transaction() function to handle resetting `D…
Browse files Browse the repository at this point in the history
…ocument#isNew` when a transaction is aborted

Fix #8852
  • Loading branch information
vkarpov15 committed May 16, 2020
1 parent 59f0024 commit a262140
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 2 deletions.
53 changes: 53 additions & 0 deletions lib/connection.js
Expand Up @@ -22,6 +22,8 @@ const utils = require('./utils');

const parseConnectionString = require('mongodb/lib/core').parseConnectionString;

const sessionNewDocuments = require('./helpers/symbols').sessionNewDocuments;

let id = 0;

/*!
Expand Down Expand Up @@ -417,6 +419,57 @@ Connection.prototype.startSession = _wrapConnHelper(function startSession(option
cb(null, session);
});

/**
* _Requires MongoDB >= 3.6.0._ Executes the wrapped async function
* in a transaction. Mongoose will commit the transaction if the
* async function executes successfully and attempt to retry if
* there was a retriable error.
*
* Calls the MongoDB driver's [`session.withTransaction()`](http://mongodb.github.io/node-mongodb-native/3.5/api/ClientSession.html#withTransaction),
* but also handles resetting Mongoose document state as shown below.
*
* ####Example:
*
* const doc = new Person({ name: 'Will Riker' });
* await db.transaction(async function setRank(session) {
* doc.rank = 'Captain';
* await doc.save({ session });
* doc.isNew; // false
*
* // Throw an error to abort the transaction
* throw new Error('Oops!');
* }).catch(() => {});
*
* // true, `transaction()` reset the document's state because the
* // transaction was aborted.
* doc.isNew;
*
* @method transaction
* @param {Function} fn Function to execute in a transaction
* @return {Promise<Any>} promise that resolves to the returned value of `fn`
* @api public
*/

Connection.prototype.transaction = function transaction(fn) {
return this.startSession().then(session => {
session[sessionNewDocuments] = [];
return session.withTransaction(() => fn(session)).
then(res => {
delete session[sessionNewDocuments];
return res;
}).
catch(err => {
// If transaction was aborted, we need to reset newly
// inserted documents' `isNew`.
for (const doc of session[sessionNewDocuments]) {
doc.isNew = true;
}
delete session[sessionNewDocuments];
throw err;
});
});
};

/**
* Helper for `dropCollection()`. Will delete the given collection, including
* all documents and indexes.
Expand Down
1 change: 1 addition & 0 deletions lib/helpers/symbols.js
Expand Up @@ -11,4 +11,5 @@ exports.modelSymbol = Symbol('mongoose#Model');
exports.objectIdSymbol = Symbol('mongoose#ObjectId');
exports.populateModelSymbol = Symbol('mongoose.PopulateOptions#Model');
exports.schemaTypeSymbol = Symbol('mongoose#schemaType');
exports.sessionNewDocuments = Symbol('mongoose:ClientSession#newDocuments');
exports.validatorErrorSymbol = Symbol('mongoose:validatorError');
4 changes: 3 additions & 1 deletion lib/index.js
Expand Up @@ -34,6 +34,7 @@ const pkg = require('../package.json');
const cast = require('./cast');
const removeSubdocs = require('./plugins/removeSubdocs');
const saveSubdocs = require('./plugins/saveSubdocs');
const trackTransaction = require('./plugins/trackTransaction');
const validateBeforeSave = require('./plugins/validateBeforeSave');

const Aggregate = require('./aggregate');
Expand Down Expand Up @@ -106,7 +107,8 @@ function Mongoose(options) {
[saveSubdocs, { deduplicate: true }],
[validateBeforeSave, { deduplicate: true }],
[shardingPlugin, { deduplicate: true }],
[removeSubdocs, { deduplicate: true }]
[removeSubdocs, { deduplicate: true }],
[trackTransaction, { deduplicate: true }]
]
});
}
Expand Down
20 changes: 20 additions & 0 deletions lib/plugins/trackTransaction.js
@@ -0,0 +1,20 @@
'use strict';

const sessionNewDocuments = require('../helpers/symbols').sessionNewDocuments;

module.exports = function trackTransaction(schema) {
schema.pre('save', function() {
if (!this.isNew) {
return;
}

const session = this.$session();
if (session == null) {
return;
}
if (session.transaction == null || session[sessionNewDocuments] == null) {
return;
}
session[sessionNewDocuments].push(this);
});
};
21 changes: 20 additions & 1 deletion test/docs/transactions.test.js
Expand Up @@ -360,7 +360,26 @@ describe('transactions', function() {
const test = yield Test.create([{}], { session }).then(res => res[0]);
yield test.save(); // throws DocumentNotFoundError
}));
yield session.endSession();
session.endSession();
});
});

it('correct `isNew` after abort (gh-8852)', function() {
return co(function*() {
const schema = Schema({ name: String });

const Test = db.model('gh8852', schema);

yield Test.createCollection();
const doc = new Test({ name: 'foo' });
yield db.
transaction(session => co(function*() {
yield doc.save({ session });
assert.ok(!doc.isNew);
throw new Error('Oops');
})).
catch(err => assert.equal(err.message, 'Oops'));
assert.ok(doc.isNew);
});
});
});

0 comments on commit a262140

Please sign in to comment.