This commit is contained in:
2020-08-20 11:44:32 +02:00
parent 4715fc1814
commit 6aceefeb2f
2891 changed files with 11239 additions and 347539 deletions

View File

@@ -22,6 +22,9 @@ const utils = require('./utils');
const parseConnectionString = require('mongodb/lib/core').parseConnectionString;
const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
const sessionNewDocuments = require('./helpers/symbols').sessionNewDocuments;
let id = 0;
/*!
@@ -417,6 +420,76 @@ Connection.prototype.startSession = _wrapConnHelper(function startSession(option
cb(null, session);
});
/**
* _Requires MongoDB >= 3.6.0._ Executes the wrapped async function
* in a transaction. Mongoose will commit the transaction if the
* async function executes successfully and attempt to retry if
* there was a retriable error.
*
* Calls the MongoDB driver's [`session.withTransaction()`](http://mongodb.github.io/node-mongodb-native/3.5/api/ClientSession.html#withTransaction),
* but also handles resetting Mongoose document state as shown below.
*
* ####Example:
*
* const doc = new Person({ name: 'Will Riker' });
* await db.transaction(async function setRank(session) {
* doc.rank = 'Captain';
* await doc.save({ session });
* doc.isNew; // false
*
* // Throw an error to abort the transaction
* throw new Error('Oops!');
* }).catch(() => {});
*
* // true, `transaction()` reset the document's state because the
* // transaction was aborted.
* doc.isNew;
*
* @method transaction
* @param {Function} fn Function to execute in a transaction
* @return {Promise<Any>} promise that resolves to the returned value of `fn`
* @api public
*/
Connection.prototype.transaction = function transaction(fn) {
return this.startSession().then(session => {
session[sessionNewDocuments] = new Map();
return session.withTransaction(() => fn(session)).
then(res => {
delete session[sessionNewDocuments];
return res;
}).
catch(err => {
// If transaction was aborted, we need to reset newly
// inserted documents' `isNew`.
for (const doc of session[sessionNewDocuments].keys()) {
const state = session[sessionNewDocuments].get(doc);
if (state.hasOwnProperty('isNew')) {
doc.isNew = state.isNew;
}
if (state.hasOwnProperty('versionKey')) {
doc.set(doc.schema.options.versionKey, state.versionKey);
}
for (const path of state.modifiedPaths) {
doc.$__.activePaths.paths[path] = 'modify';
doc.$__.activePaths.states.modify[path] = true;
}
for (const path of state.atomics.keys()) {
const val = doc.$__getValue(path);
if (val == null) {
continue;
}
val[arrayAtomicsSymbol] = state.atomics.get(path);
}
}
delete session[sessionNewDocuments];
throw err;
});
});
};
/**
* Helper for `dropCollection()`. Will delete the given collection, including
* all documents and indexes.
@@ -552,7 +625,6 @@ Connection.prototype.onOpen = function() {
* @param {Number} [options.reconnectTries=30] If you're connected to a single server or mongos proxy (as opposed to a replica set), the MongoDB driver will try to reconnect every `reconnectInterval` milliseconds for `reconnectTries` times, and give up afterward. When the driver gives up, the mongoose connection emits a `reconnectFailed` event. This option does nothing for replica set connections.
* @param {Number} [options.reconnectInterval=1000] See `reconnectTries` option above.
* @param {Class} [options.promiseLibrary] Sets the [underlying driver's promise library](http://mongodb.github.io/node-mongodb-native/3.1/api/MongoClient.html).
* @param {Number} [options.poolSize=5] The maximum number of sockets the MongoDB driver will keep open for this connection. By default, `poolSize` is 5. Keep in mind that, as of MongoDB 3.4, MongoDB only allows one operation per socket at a time, so you may want to increase this if you find you have a few slow queries that are blocking faster queries from proceeding. See [Slow Trains in MongoDB and Node.js](http://thecodebarbarian.com/slow-trains-in-mongodb-and-nodejs).
* @param {Number} [options.bufferMaxEntries] This option does nothing if `useUnifiedTopology` is set. The MongoDB driver also has its own buffering mechanism that kicks in when the driver is disconnected. Set this option to 0 and set `bufferCommands` to `false` on your schemas if you want your database operations to fail immediately when the driver is not connected, as opposed to waiting for reconnection.
* @param {Number} [options.connectTimeoutMS=30000] How long the MongoDB driver will wait before killing a socket due to inactivity _during initial connection_. Defaults to 30000. This option is passed transparently to [Node.js' `socket#setTimeout()` function](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback).
* @param {Number} [options.socketTimeoutMS=30000] How long the MongoDB driver will wait before killing a socket due to inactivity _after initial connection_. A socket may be inactive because of either no activity or a long-running operation. This is set to `30000` by default, you should set this to 2-3x your longest running operation if you expect some of your database operations to run longer than 20 seconds. This option is passed to [Node.js `socket#setTimeout()` function](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback) after the MongoDB driver successfully completes.
@@ -563,9 +635,6 @@ Connection.prototype.onOpen = function() {
*/
Connection.prototype.openUri = function(uri, options, callback) {
this.readyState = STATES.connecting;
this._closeCalled = false;
if (typeof options === 'function') {
callback = options;
options = null;
@@ -590,6 +659,23 @@ Connection.prototype.openUri = function(uri, options, callback) {
typeof callback + '"');
}
if (this.readyState === STATES.connecting || this.readyState === STATES.connected) {
if (this._connectionString !== uri) {
throw new MongooseError('Can\'t call `openUri()` on an active connection with ' +
'different connection strings. Make sure you aren\'t calling `mongoose.connect()` ' +
'multiple times. See: https://mongoosejs.com/docs/connections.html#multiple_connections');
}
if (typeof callback === 'function') {
callback(null, this);
}
return this;
}
this._connectionString = uri;
this.readyState = STATES.connecting;
this._closeCalled = false;
const Promise = PromiseProvider.get();
const _this = this;
@@ -631,7 +717,9 @@ Connection.prototype.openUri = function(uri, options, callback) {
delete options.pass;
if (options.bufferCommands != null) {
options.bufferMaxEntries = 0;
if (options.bufferMaxEntries == null) {
options.bufferMaxEntries = 0;
}
this.config.bufferCommands = options.bufferCommands;
delete options.bufferCommands;
}
@@ -694,18 +782,6 @@ Connection.prototype.openUri = function(uri, options, callback) {
});
});
const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (_this.readyState !== STATES.connected) {
_this.readyState = STATES.connected;
_this.emit('reconnect');
_this.emit('reconnected');
}
};
const promise = new Promise((resolve, reject) => {
const client = new mongodb.MongoClient(uri, options);
_this.client = client;
@@ -715,111 +791,9 @@ Connection.prototype.openUri = function(uri, options, callback) {
return reject(error);
}
const db = dbName != null ? client.db(dbName) : client.db();
_this.db = db;
// `useUnifiedTopology` events
const type = get(db, 's.topology.s.description.type', '');
if (options.useUnifiedTopology) {
if (type === 'Single') {
const server = Array.from(db.s.topology.s.servers.values())[0];
server.s.topology.on('serverHeartbeatSucceeded', () => {
_handleReconnect();
});
server.s.pool.on('reconnect', () => {
_handleReconnect();
});
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Standalone') {
_handleReconnect();
} else {
_this.readyState = STATES.disconnected;
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to _all_ servers
// in the replica set.
const description = ev.newDescription;
const servers = Array.from(ev.newDescription.servers.values());
const allServersDisconnected = description.type === 'ReplicaSetNoPrimary' &&
servers.reduce((cur, d) => cur || d.type === 'Unknown', false);
if (_this.readyState === STATES.connected && allServersDisconnected) {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
} else if (_this.readyState === STATES.disconnected && !allServersDisconnected) {
_handleReconnect();
}
});
db.on('close', function() {
const type = get(db, 's.topology.s.description.type', '');
if (type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
}
});
}
}
// Backwards compat for mongoose 4.x
db.on('reconnect', function() {
_handleReconnect();
});
db.s.topology.on('reconnectFailed', function() {
_this.emit('reconnectFailed');
});
if (!options.useUnifiedTopology) {
db.s.topology.on('left', function(data) {
_this.emit('left', data);
});
}
db.s.topology.on('joined', function(data) {
_this.emit('joined', data);
});
db.s.topology.on('fullsetup', function(data) {
_this.emit('fullsetup', data);
});
if (get(db, 's.topology.s.coreTopology.s.pool') != null) {
db.s.topology.s.coreTopology.s.pool.on('attemptReconnect', function() {
_this.emit('attemptReconnect');
});
}
if (!options.useUnifiedTopology || !type.startsWith('ReplicaSet')) {
db.on('close', function() {
// Implicitly emits 'disconnected'
_this.readyState = STATES.disconnected;
});
}
if (!options.useUnifiedTopology) {
client.on('left', function() {
if (_this.readyState === STATES.connected &&
get(db, 's.topology.s.coreTopology.s.replicaSetState.topologyType') === 'ReplicaSetNoPrimary') {
_this.readyState = STATES.disconnected;
}
});
}
db.on('timeout', function() {
_this.emit('timeout');
});
delete _this.then;
delete _this.catch;
_this.readyState = STATES.connected;
for (const i in _this.collections) {
if (utils.object.hasOwnProperty(_this.collections, i)) {
_this.collections[i].onOpen();
}
}
_setClient(_this, client, options, dbName);
resolve(_this);
_this.emit('open');
});
});
@@ -853,6 +827,126 @@ Connection.prototype.openUri = function(uri, options, callback) {
return this;
};
function _setClient(conn, client, options, dbName) {
const db = dbName != null ? client.db(dbName) : client.db();
conn.db = db;
conn.client = client;
const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (conn.readyState !== STATES.connected) {
conn.readyState = STATES.connected;
conn.emit('reconnect');
conn.emit('reconnected');
conn.onOpen();
}
};
// `useUnifiedTopology` events
const type = get(db, 's.topology.s.description.type', '');
if (options.useUnifiedTopology) {
if (type === 'Single') {
const server = Array.from(db.s.topology.s.servers.values())[0];
server.s.topology.on('serverHeartbeatSucceeded', () => {
_handleReconnect();
});
server.s.pool.on('reconnect', () => {
_handleReconnect();
});
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Standalone') {
_handleReconnect();
} else {
conn.readyState = STATES.disconnected;
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to _all_ servers
// in the replica set.
const description = ev.newDescription;
const servers = Array.from(ev.newDescription.servers.values());
const allServersDisconnected = description.type === 'ReplicaSetNoPrimary' &&
servers.reduce((cur, d) => cur || d.type === 'Unknown', false);
if (conn.readyState === STATES.connected && allServersDisconnected) {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
} else if (conn.readyState === STATES.disconnected && !allServersDisconnected) {
_handleReconnect();
}
});
db.on('close', function() {
const type = get(db, 's.topology.s.description.type', '');
if (type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
}
});
}
}
// Backwards compat for mongoose 4.x
db.on('reconnect', function() {
_handleReconnect();
});
db.s.topology.on('reconnectFailed', function() {
conn.emit('reconnectFailed');
});
if (!options.useUnifiedTopology) {
db.s.topology.on('left', function(data) {
conn.emit('left', data);
});
}
db.s.topology.on('joined', function(data) {
conn.emit('joined', data);
});
db.s.topology.on('fullsetup', function(data) {
conn.emit('fullsetup', data);
});
if (get(db, 's.topology.s.coreTopology.s.pool') != null) {
db.s.topology.s.coreTopology.s.pool.on('attemptReconnect', function() {
conn.emit('attemptReconnect');
});
}
if (!options.useUnifiedTopology || !type.startsWith('ReplicaSet')) {
db.on('close', function() {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
});
}
if (!options.useUnifiedTopology) {
client.on('left', function() {
if (conn.readyState === STATES.connected &&
get(db, 's.topology.s.coreTopology.s.replicaSetState.topologyType') === 'ReplicaSetNoPrimary') {
conn.readyState = STATES.disconnected;
}
});
}
db.on('timeout', function() {
conn.emit('timeout');
});
delete conn.then;
delete conn.catch;
conn.readyState = STATES.connected;
for (const i in conn.collections) {
if (utils.object.hasOwnProperty(conn.collections, i)) {
conn.collections[i].onOpen();
}
}
conn.emit('open');
}
/*!
* ignore
*/
@@ -1267,6 +1361,57 @@ Connection.prototype.optionsProvideAuthenticationData = function(options) {
((options.pass) || this.authMechanismDoesNotRequirePassword());
};
/**
* Returns the [MongoDB driver `MongoClient`](http://mongodb.github.io/node-mongodb-native/3.5/api/MongoClient.html) instance
* that this connection uses to talk to MongoDB.
*
* ####Example:
* const conn = await mongoose.createConnection('mongodb://localhost:27017/test');
*
* conn.getClient(); // MongoClient { ... }
*
* @api public
* @return {MongoClient}
*/
Connection.prototype.getClient = function getClient() {
return this.client;
};
/**
* Set the [MongoDB driver `MongoClient`](http://mongodb.github.io/node-mongodb-native/3.5/api/MongoClient.html) instance
* that this connection uses to talk to MongoDB. This is useful if you already have a MongoClient instance, and want to
* reuse it.
*
* ####Example:
* const client = await mongodb.MongoClient.connect('mongodb://localhost:27017/test');
*
* const conn = mongoose.createConnection().setClient(client);
*
* conn.getClient(); // MongoClient { ... }
* conn.readyState; // 1, means 'CONNECTED'
*
* @api public
* @return {Connection} this
*/
Connection.prototype.setClient = function setClient(client) {
if (!(client instanceof mongodb.MongoClient)) {
throw new MongooseError('Must call `setClient()` with an instance of MongoClient');
}
if (this.client != null || this.readyState !== STATES.disconnected) {
throw new MongooseError('Cannot call `setClient()` on a connection that is already connected.');
}
if (!client.isConnected()) {
throw new MongooseError('Cannot call `setClient()` with a MongoClient that is not connected.');
}
this._connectionString = client.s.url;
_setClient(this, client, { useUnifiedTopology: client.s.options.useUnifiedTopology }, client.s.options.dbName);
return this;
};
/**
* Switches to a different database using the same connection pool.
*