Skip to content

Commit

Permalink
feat(connection): make transaction() helper reset array atomics after…
Browse files Browse the repository at this point in the history
… failed transaction

Fix #8380
  • Loading branch information
vkarpov15 committed Jul 2, 2020
1 parent 156de0c commit 0edb94d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 7 deletions.
14 changes: 14 additions & 0 deletions lib/connection.js
Expand Up @@ -22,6 +22,7 @@ const utils = require('./utils');

const parseConnectionString = require('mongodb/lib/core').parseConnectionString;

const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
const sessionNewDocuments = require('./helpers/symbols').sessionNewDocuments;

let id = 0;
Expand Down Expand Up @@ -469,6 +470,19 @@ Connection.prototype.transaction = function transaction(fn) {
if (state.hasOwnProperty('versionKey')) {
doc.set(doc.schema.options.versionKey, state.versionKey);
}

for (const path of state.modifiedPaths) {
doc.$__.activePaths.paths[path] = 'modify';
doc.$__.activePaths.states.modify[path] = true;
}

for (const path of state.atomics.keys()) {
const val = doc.$__getValue(path);
if (val == null) {
continue;
}
val[arrayAtomicsSymbol] = state.atomics.get(path);
}
}
delete session[sessionNewDocuments];
throw err;
Expand Down
66 changes: 65 additions & 1 deletion lib/plugins/trackTransaction.js
@@ -1,5 +1,6 @@
'use strict';

const arrayAtomicsSymbol = require('../helpers/symbols').arrayAtomicsSymbol;
const sessionNewDocuments = require('../helpers/symbols').sessionNewDocuments;

module.exports = function trackTransaction(schema) {
Expand All @@ -21,7 +22,70 @@ module.exports = function trackTransaction(schema) {
initialState.versionKey = this.get(this.schema.options.versionKey);
}

initialState.modifiedPaths = new Set(Object.keys(this.$__.activePaths.states.modify));
initialState.atomics = _getAtomics(this);

session[sessionNewDocuments].set(this, initialState);
} else {
const state = session[sessionNewDocuments].get(this);

for (const path of Object.keys(this.$__.activePaths.states.modify)) {
state.modifiedPaths.add(path);
}
state.atomics = _getAtomics(this, state.atomics);
}
});
};
};

function _getAtomics(doc, previous) {
const pathToAtomics = new Map();
previous = previous || new Map();

const pathsToCheck = Object.keys(doc.$__.activePaths.init).concat(Object.keys(doc.$__.activePaths.modify));

for (const path of pathsToCheck) {
const val = doc.$__getValue(path);
if (val != null &&
val instanceof Array &&
val.isMongooseDocumentArray &&
val.length &&
val[arrayAtomicsSymbol] != null &&
Object.keys(val[arrayAtomicsSymbol]).length > 0) {
const existing = previous.get(path) || {};
pathToAtomics.set(path, mergeAtomics(existing, val[arrayAtomicsSymbol]));
}
}

const dirty = doc.$__dirty();
for (const dirt of dirty) {
const path = dirt.path;

const val = dirt.value;
if (val != null && val[arrayAtomicsSymbol] != null && Object.keys(val[arrayAtomicsSymbol]).length > 0) {
const existing = previous.get(path) || {};
pathToAtomics.set(path, mergeAtomics(existing, val[arrayAtomicsSymbol]));
}
}

return pathToAtomics;
}

function mergeAtomics(destination, source) {
destination = destination || {};

if (source.$pullAll != null) {
destination.$pullAll = (destination.$pullAll || []).concat(source.$pullAll);
}
if (source.$push != null) {
destination.$push = destination.$push || {};
destination.$push.$each = (destination.$push.$each || []).concat(source.$push.$each);
}
if (source.$addToSet != null) {
destination.$addToSet = (destination.$addToSet || []).concat(source.$addToSet);
}
if (source.$set != null) {
destination.$set = Object.assign(destination.$set, source.$set);
}

return destination;
}
29 changes: 23 additions & 6 deletions test/docs/transactions.test.js
Expand Up @@ -386,22 +386,39 @@ describe('transactions', function() {

it('can save document after aborted transaction (gh-8380)', function() {
return co(function*() {
const schema = Schema({ name: String, arr: [String] });
const schema = Schema({ name: String, arr: [String], arr2: [String] });

const Test = db.model('gh8380', schema);

yield Test.createCollection();
yield Test.create({ name: 'foo', arr: ['bar'] });
yield Test.create({ name: 'foo', arr: ['bar'], arr2: ['foo'] });
const doc = yield Test.findOne();
yield db.
transaction(session => co(function*() {
doc.arr.pop();
doc.arr.pull('bar');
doc.arr2.push('bar');

yield doc.save({ session });

doc.name = 'baz';
throw new Error('Oops');
})).
catch(err => assert.equal(err.message, 'Oops'));
doc.set('arr.0', 'qux');
yield doc.save();
catch(err => {
assert.equal(err.message, 'Oops');
});

const changes = doc.$__delta()[1];
assert.equal(changes.$set.name, 'baz');
assert.deepEqual(changes.$pullAll.arr, ['bar']);
assert.deepEqual(changes.$push.arr2, { $each: ['bar'] });
assert.ok(!changes.$set.arr2);

yield doc.save({ session: null });

const newDoc = yield Test.collection.findOne();
assert.equal(newDoc.name, 'baz');
assert.deepEqual(newDoc.arr, []);
assert.deepEqual(newDoc.arr2, ['foo', 'bar']);
});
});
});

0 comments on commit 0edb94d

Please sign in to comment.