Skip to content

Commit

Permalink
feat(connection): add getClient() and setClient() function for in…
Browse files Browse the repository at this point in the history
…teracting with a connection's underlying MongoClient instance

Fix Automattic#9164
  • Loading branch information
vkarpov15 committed Jul 1, 2020
1 parent ab0a38b commit 156de0c
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 115 deletions.
285 changes: 170 additions & 115 deletions lib/connection.js
Expand Up @@ -753,18 +753,6 @@ Connection.prototype.openUri = function(uri, options, callback) {
});
});

const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (_this.readyState !== STATES.connected) {
_this.readyState = STATES.connected;
_this.emit('reconnect');
_this.emit('reconnected');
}
};

const promise = new Promise((resolve, reject) => {
if (_this.client != null) {
_this.client.close();
Expand All @@ -778,111 +766,9 @@ Connection.prototype.openUri = function(uri, options, callback) {
return reject(error);
}

const db = dbName != null ? client.db(dbName) : client.db();
_this.db = db;

// `useUnifiedTopology` events
const type = get(db, 's.topology.s.description.type', '');
if (options.useUnifiedTopology) {
if (type === 'Single') {
const server = Array.from(db.s.topology.s.servers.values())[0];

server.s.topology.on('serverHeartbeatSucceeded', () => {
_handleReconnect();
});
server.s.pool.on('reconnect', () => {
_handleReconnect();
});
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Standalone') {
_handleReconnect();
} else {
_this.readyState = STATES.disconnected;
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to _all_ servers
// in the replica set.
const description = ev.newDescription;
const servers = Array.from(ev.newDescription.servers.values());
const allServersDisconnected = description.type === 'ReplicaSetNoPrimary' &&
servers.reduce((cur, d) => cur || d.type === 'Unknown', false);
if (_this.readyState === STATES.connected && allServersDisconnected) {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
} else if (_this.readyState === STATES.disconnected && !allServersDisconnected) {
_handleReconnect();
}
});

db.on('close', function() {
const type = get(db, 's.topology.s.description.type', '');
if (type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
}
});
}
}

// Backwards compat for mongoose 4.x
db.on('reconnect', function() {
_handleReconnect();
});
db.s.topology.on('reconnectFailed', function() {
_this.emit('reconnectFailed');
});

if (!options.useUnifiedTopology) {
db.s.topology.on('left', function(data) {
_this.emit('left', data);
});
}
db.s.topology.on('joined', function(data) {
_this.emit('joined', data);
});
db.s.topology.on('fullsetup', function(data) {
_this.emit('fullsetup', data);
});
if (get(db, 's.topology.s.coreTopology.s.pool') != null) {
db.s.topology.s.coreTopology.s.pool.on('attemptReconnect', function() {
_this.emit('attemptReconnect');
});
}
if (!options.useUnifiedTopology || !type.startsWith('ReplicaSet')) {
db.on('close', function() {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
});
}

if (!options.useUnifiedTopology) {
client.on('left', function() {
if (_this.readyState === STATES.connected &&
get(db, 's.topology.s.coreTopology.s.replicaSetState.topologyType') === 'ReplicaSetNoPrimary') {
_this.readyState = STATES.disconnected;
}
});
}

db.on('timeout', function() {
_this.emit('timeout');
});

delete _this.then;
delete _this.catch;
_this.readyState = STATES.connected;

for (const i in _this.collections) {
if (utils.object.hasOwnProperty(_this.collections, i)) {
_this.collections[i].onOpen();
}
}
_setClient(_this, client, options, dbName);

resolve(_this);
_this.emit('open');
});
});

Expand Down Expand Up @@ -916,6 +802,125 @@ Connection.prototype.openUri = function(uri, options, callback) {
return this;
};

function _setClient(conn, client, options, dbName) {
const db = dbName != null ? client.db(dbName) : client.db();
conn.db = db;
conn.client = client;

const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (conn.readyState !== STATES.connected) {
conn.readyState = STATES.connected;
conn.emit('reconnect');
conn.emit('reconnected');
}
};

// `useUnifiedTopology` events
const type = get(db, 's.topology.s.description.type', '');
if (options.useUnifiedTopology) {
if (type === 'Single') {
const server = Array.from(db.s.topology.s.servers.values())[0];
server.s.topology.on('serverHeartbeatSucceeded', () => {
_handleReconnect();
});
server.s.pool.on('reconnect', () => {
_handleReconnect();
});
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Standalone') {
_handleReconnect();
} else {
conn.readyState = STATES.disconnected;
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to _all_ servers
// in the replica set.
const description = ev.newDescription;
const servers = Array.from(ev.newDescription.servers.values());
const allServersDisconnected = description.type === 'ReplicaSetNoPrimary' &&
servers.reduce((cur, d) => cur || d.type === 'Unknown', false);
if (conn.readyState === STATES.connected && allServersDisconnected) {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
} else if (conn.readyState === STATES.disconnected && !allServersDisconnected) {
_handleReconnect();
}
});

db.on('close', function() {
const type = get(db, 's.topology.s.description.type', '');
if (type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
}
});
}
}

// Backwards compat for mongoose 4.x
db.on('reconnect', function() {
_handleReconnect();
});
db.s.topology.on('reconnectFailed', function() {
conn.emit('reconnectFailed');
});

if (!options.useUnifiedTopology) {
db.s.topology.on('left', function(data) {
conn.emit('left', data);
});
}
db.s.topology.on('joined', function(data) {
conn.emit('joined', data);
});
db.s.topology.on('fullsetup', function(data) {
conn.emit('fullsetup', data);
});
if (get(db, 's.topology.s.coreTopology.s.pool') != null) {
db.s.topology.s.coreTopology.s.pool.on('attemptReconnect', function() {
conn.emit('attemptReconnect');
});
}
if (!options.useUnifiedTopology || !type.startsWith('ReplicaSet')) {
db.on('close', function() {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
});
}

if (!options.useUnifiedTopology) {
client.on('left', function() {
if (conn.readyState === STATES.connected &&
get(db, 's.topology.s.coreTopology.s.replicaSetState.topologyType') === 'ReplicaSetNoPrimary') {
conn.readyState = STATES.disconnected;
}
});
}

db.on('timeout', function() {
conn.emit('timeout');
});

delete conn.then;
delete conn.catch;
conn.readyState = STATES.connected;

for (const i in conn.collections) {
if (utils.object.hasOwnProperty(conn.collections, i)) {
conn.collections[i].onOpen();
}
}

conn.emit('open');
}

/*!
* ignore
*/
Expand Down Expand Up @@ -1330,6 +1335,56 @@ Connection.prototype.optionsProvideAuthenticationData = function(options) {
((options.pass) || this.authMechanismDoesNotRequirePassword());
};

/**
* Returns the [MongoDB driver `MongoClient`](http://mongodb.github.io/node-mongodb-native/3.5/api/MongoClient.html) instance
* that this connection uses to talk to MongoDB.
*
* ####Example:
* const conn = await mongoose.createConnection('mongodb://localhost:27017/test');
*
* conn.getClient(); // MongoClient { ... }
*
* @api public
* @return {MongoClient}
*/

Connection.prototype.getClient = function getClient() {
return this.client;
};

/**
* Set the [MongoDB driver `MongoClient`](http://mongodb.github.io/node-mongodb-native/3.5/api/MongoClient.html) instance
* that this connection uses to talk to MongoDB. This is useful if you already have a MongoClient instance, and want to
* reuse it.
*
* ####Example:
* const client = await mongodb.MongoClient.connect('mongodb://localhost:27017/test');
*
* const conn = mongoose.createConnection().setClient(client);
*
* conn.getClient(); // MongoClient { ... }
* conn.readyState; // 1, means 'CONNECTED'
*
* @api public
* @return {Connection} this
*/

Connection.prototype.setClient = function setClient(client) {
if (!(client instanceof mongodb.MongoClient)) {
throw new MongooseError('Must call `setClient()` with an instance of MongoClient');
}
if (this.client != null || this.readyState !== STATES.disconnected) {
throw new MongooseError('Cannot call `setClient()` on a connection that is already connected.');
}
if (!client.isConnected()) {
throw new MongooseError('Cannot call `setClient()` with a MongoClient that is not connected.');
}

_setClient(this, client, { useUnifiedTopology: client.s.options.useUnifiedTopology }, client.s.options.dbName);

return this;
};

/**
* Switches to a different database using the same connection pool.
*
Expand Down
17 changes: 17 additions & 0 deletions test/connection.test.js
Expand Up @@ -10,6 +10,7 @@ const Promise = require('bluebird');
const Q = require('q');
const assert = require('assert');
const co = require('co');
const mongodb = require('mongodb');
const server = require('./common').server;

const mongoose = start.mongoose;
Expand Down Expand Up @@ -1185,4 +1186,20 @@ describe('connections:', function() {
assert.equal(db2.config.useCreateIndex, true);
});
});

it('allows setting client on a disconnected connection (gh-9164)', function() {
return co(function*() {
const client = yield mongodb.MongoClient.connect('mongodb://localhost:27017/mongoose_test', {
useNewUrlParser: true,
useUnifiedTopology: true
});
const conn = mongoose.createConnection().setClient(client);

assert.equal(conn.readyState, 1);

yield conn.createCollection('test');
const res = yield conn.dropCollection('test');
assert.ok(res);
});
});
});

0 comments on commit 156de0c

Please sign in to comment.