Refactoring day1

This commit is contained in:
2020-08-20 20:27:14 +02:00
parent 6aceefeb2f
commit b907489a75
481 changed files with 5321 additions and 5616 deletions

View File

@@ -1,55 +1,158 @@
'use strict';
/**
* Context used during authentication
*
* @property {Connection} connection The connection to authenticate
* @property {MongoCredentials} credentials The credentials to use for authentication
* @property {object} options The options passed to the `connect` method
* @property {object?} response The response of the initial handshake
* @property {Buffer?} nonce A random nonce generated for use in an authentication conversation
*/
class AuthContext {
constructor(connection, credentials, options) {
this.connection = connection;
this.credentials = credentials;
this.options = options;
}
}
const MongoError = require('../error').MongoError;
/**
* Creates a new AuthProvider, which dictates how to authenticate for a given
* mechanism.
* @class
*/
class AuthProvider {
constructor(bson) {
this.bson = bson;
}
/**
* Prepare the handshake document before the initial handshake.
*
* @param {object} handshakeDoc The document used for the initial handshake on a connection
* @param {AuthContext} authContext Context for authentication flow
* @param {function} callback
*/
prepare(handshakeDoc, context, callback) {
callback(undefined, handshakeDoc);
this.authStore = [];
}
/**
* Authenticate
*
* @param {AuthContext} context A shared context for authentication flow
* @method
* @param {SendAuthCommand} sendAuthCommand Writes an auth command directly to a specific connection
* @param {Connection[]} connections Connections to authenticate using this authenticator
* @param {MongoCredentials} credentials Authentication credentials
* @param {authResultCallback} callback The callback to return the result from the authentication
*/
auth(context, callback) {
callback(new TypeError('`auth` method must be overridden by subclass'));
auth(sendAuthCommand, connections, credentials, callback) {
// Total connections
let count = connections.length;
if (count === 0) {
callback(null, null);
return;
}
// Valid connections
let numberOfValidConnections = 0;
let errorObject = null;
const execute = connection => {
this._authenticateSingleConnection(sendAuthCommand, connection, credentials, (err, r) => {
// Adjust count
count = count - 1;
// If we have an error
if (err) {
errorObject = new MongoError(err);
} else if (r && (r.$err || r.errmsg)) {
errorObject = new MongoError(r);
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// Still authenticating against other connections.
if (count !== 0) {
return;
}
// We have authenticated all connections
if (numberOfValidConnections > 0) {
// Store the auth details
this.addCredentials(credentials);
// Return correct authentication
callback(null, true);
} else {
if (errorObject == null) {
errorObject = new MongoError(`failed to authenticate using ${credentials.mechanism}`);
}
callback(errorObject, false);
}
});
};
const executeInNextTick = _connection => process.nextTick(() => execute(_connection));
// For each connection we need to authenticate
while (connections.length > 0) {
executeInNextTick(connections.shift());
}
}
/**
* Implementation of a single connection authenticating. Is meant to be overridden.
* Will error if called directly
* @ignore
*/
_authenticateSingleConnection(/*sendAuthCommand, connection, credentials, callback*/) {
throw new Error('_authenticateSingleConnection must be overridden');
}
/**
* Adds credentials to store only if it does not exist
* @param {MongoCredentials} credentials credentials to add to store
*/
addCredentials(credentials) {
const found = this.authStore.some(cred => cred.equals(credentials));
if (!found) {
this.authStore.push(credentials);
}
}
/**
* Re authenticate pool
* @method
* @param {SendAuthCommand} sendAuthCommand Writes an auth command directly to a specific connection
* @param {Connection[]} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
*/
reauthenticate(sendAuthCommand, connections, callback) {
const authStore = this.authStore.slice(0);
let count = authStore.length;
if (count === 0) {
return callback(null, null);
}
for (let i = 0; i < authStore.length; i++) {
this.auth(sendAuthCommand, connections, authStore[i], function(err) {
count = count - 1;
if (count === 0) {
callback(err, null);
}
});
}
}
/**
* Remove credentials that have been previously stored in the auth provider
* @method
* @param {string} source Name of database we are removing authStore details about
* @return {object}
*/
logout(source) {
this.authStore = this.authStore.filter(credentials => credentials.source !== source);
}
}
/**
* This is a result from an authentication provider
* A function that writes authentication commands to a specific connection
* @callback SendAuthCommand
* @param {Connection} connection The connection to write to
* @param {Command} command A command with a toBin method that can be written to a connection
* @param {AuthWriteCallback} callback Callback called when command response is received
*/
/**
* A callback for a specific auth command
* @callback AuthWriteCallback
* @param {Error} err If command failed, an error from the server
* @param {object} r The response from the server
*/
/**
* This is a result from an authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = { AuthContext, AuthProvider };
module.exports = { AuthProvider };

View File

@@ -4,9 +4,9 @@ const MongoCR = require('./mongocr');
const X509 = require('./x509');
const Plain = require('./plain');
const GSSAPI = require('./gssapi');
const SSPI = require('./sspi');
const ScramSHA1 = require('./scram').ScramSHA1;
const ScramSHA256 = require('./scram').ScramSHA256;
const MongoDBAWS = require('./mongodb_aws');
/**
* Returns the default authentication providers.
@@ -16,11 +16,11 @@ const MongoDBAWS = require('./mongodb_aws');
*/
function defaultAuthProviders(bson) {
return {
'mongodb-aws': new MongoDBAWS(bson),
mongocr: new MongoCR(bson),
x509: new X509(bson),
plain: new Plain(bson),
gssapi: new GSSAPI(bson),
sspi: new SSPI(bson),
'scram-sha-1': new ScramSHA1(bson),
'scram-sha-256': new ScramSHA256(bson)
};

View File

@@ -1,22 +1,21 @@
'use strict';
const AuthProvider = require('./auth_provider').AuthProvider;
const retrieveKerberos = require('../utils').retrieveKerberos;
let kerberos;
/**
* Creates a new GSSAPI authentication mechanism
* @class
* @extends AuthProvider
*/
class GSSAPI extends AuthProvider {
auth(authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
if (kerberos == null) {
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
}
}
// TODO: Destructure this
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const source = credentials.source;
const username = credentials.username;
const password = credentials.password;
const mechanismProperties = credentials.mechanismProperties;
@@ -25,67 +24,218 @@ class GSSAPI extends AuthProvider {
mechanismProperties['gssapiServiceName'] ||
'mongodb';
const MongoAuthProcess = kerberos.processes.MongoAuthProcess;
const authProcess = new MongoAuthProcess(
connection.host,
connection.port,
GSSAPIInitialize(
this,
kerberos.processes.MongoAuthProcess,
source,
username,
password,
source,
gssapiServiceName,
mechanismProperties
sendAuthCommand,
connection,
mechanismProperties,
callback
);
}
authProcess.init(username, password, err => {
if (err) return callback(err, false);
/**
* Authenticate
* @override
* @method
*/
auth(sendAuthCommand, connections, credentials, callback) {
if (kerberos == null) {
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
}
}
authProcess.transition('', (err, payload) => {
if (err) return callback(err, false);
const command = {
saslStart: 1,
mechanism: 'GSSAPI',
payload,
autoAuthorize: 1
};
connection.command('$external.$cmd', command, (err, result) => {
if (err) return callback(err, false);
const doc = result.result;
authProcess.transition(doc.payload, (err, payload) => {
if (err) return callback(err, false);
const command = {
saslContinue: 1,
conversationId: doc.conversationId,
payload
};
connection.command('$external.$cmd', command, (err, result) => {
if (err) return callback(err, false);
const doc = result.result;
authProcess.transition(doc.payload, (err, payload) => {
if (err) return callback(err, false);
const command = {
saslContinue: 1,
conversationId: doc.conversationId,
payload
};
connection.command('$external.$cmd', command, (err, result) => {
if (err) return callback(err, false);
const response = result.result;
authProcess.transition(null, err => {
if (err) return callback(err, null);
callback(null, response);
});
});
});
});
});
});
});
});
super.auth(sendAuthCommand, connections, credentials, callback);
}
}
//
// Initialize step
var GSSAPIInitialize = function(
self,
MongoAuthProcess,
db,
username,
password,
authdb,
gssapiServiceName,
sendAuthCommand,
connection,
options,
callback
) {
// Create authenticator
var mongo_auth_process = new MongoAuthProcess(
connection.host,
connection.port,
gssapiServiceName,
options
);
// Perform initialization
mongo_auth_process.init(username, password, function(err) {
if (err) return callback(err, false);
// Perform the first step
mongo_auth_process.transition('', function(err, payload) {
if (err) return callback(err, false);
// Call the next db step
MongoDBGSSAPIFirstStep(
self,
mongo_auth_process,
payload,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
);
});
});
};
//
// Perform first step against mongodb
var MongoDBGSSAPIFirstStep = function(
self,
mongo_auth_process,
payload,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
) {
// Build the sasl start command
var command = {
saslStart: 1,
mechanism: 'GSSAPI',
payload: payload,
autoAuthorize: 1
};
// Write the commmand on the connection
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => {
if (err) return callback(err, false);
// Execute mongodb transition
mongo_auth_process.transition(doc.payload, function(err, payload) {
if (err) return callback(err, false);
// MongoDB API Second Step
MongoDBGSSAPISecondStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
);
});
});
};
//
// Perform first step against mongodb
var MongoDBGSSAPISecondStep = function(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
) {
// Build Authentication command to send to MongoDB
var command = {
saslContinue: 1,
conversationId: doc.conversationId,
payload: payload
};
// Execute the command
// Write the commmand on the connection
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => {
if (err) return callback(err, false);
// Call next transition for kerberos
mongo_auth_process.transition(doc.payload, function(err, payload) {
if (err) return callback(err, false);
// Call the last and third step
MongoDBGSSAPIThirdStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
);
});
});
};
var MongoDBGSSAPIThirdStep = function(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
) {
// Build final command
var command = {
saslContinue: 1,
conversationId: doc.conversationId,
payload: payload
};
// Execute the command
sendAuthCommand(connection, '$external.$cmd', command, (err, r) => {
if (err) return callback(err, false);
mongo_auth_process.transition(null, function(err) {
if (err) return callback(err, null);
callback(null, r);
});
});
};
/**
* This is a result from a authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = GSSAPI;

View File

@@ -47,24 +47,7 @@ class MongoCredentials {
this.password = options.password;
this.source = options.source || options.db;
this.mechanism = options.mechanism || 'default';
this.mechanismProperties = options.mechanismProperties || {};
if (this.mechanism.match(/MONGODB-AWS/i)) {
if (this.username == null && process.env.AWS_ACCESS_KEY_ID) {
this.username = process.env.AWS_ACCESS_KEY_ID;
}
if (this.password == null && process.env.AWS_SECRET_ACCESS_KEY) {
this.password = process.env.AWS_SECRET_ACCESS_KEY;
}
if (this.mechanismProperties.AWS_SESSION_TOKEN == null && process.env.AWS_SESSION_TOKEN) {
this.mechanismProperties.AWS_SESSION_TOKEN = process.env.AWS_SESSION_TOKEN;
}
}
Object.freeze(this.mechanismProperties);
Object.freeze(this);
this.mechanismProperties = options.mechanismProperties;
}
/**
@@ -86,21 +69,12 @@ class MongoCredentials {
* based on the server version and server supported sasl mechanisms.
*
* @param {Object} [ismaster] An ismaster response from the server
* @returns {MongoCredentials}
*/
resolveAuthMechanism(ismaster) {
// If the mechanism is not "default", then it does not need to be resolved
if (this.mechanism.match(/DEFAULT/i)) {
return new MongoCredentials({
username: this.username,
password: this.password,
source: this.source,
mechanism: getDefaultAuthMechanism(ismaster),
mechanismProperties: this.mechanismProperties
});
if (this.mechanism.toLowerCase() === 'default') {
this.mechanism = getDefaultAuthMechanism(ismaster);
}
return this;
}
}

View File

@@ -3,21 +3,27 @@
const crypto = require('crypto');
const AuthProvider = require('./auth_provider').AuthProvider;
/**
* Creates a new MongoCR authentication mechanism
*
* @extends AuthProvider
*/
class MongoCR extends AuthProvider {
auth(authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const password = credentials.password;
const source = credentials.source;
connection.command(`${source}.$cmd`, { getnonce: 1 }, (err, result) => {
sendAuthCommand(connection, `${source}.$cmd`, { getnonce: 1 }, (err, r) => {
let nonce = null;
let key = null;
// Get nonce
if (err == null) {
const r = result.result;
nonce = r.nonce;
// Use node md5 generator
let md5 = crypto.createHash('md5');
@@ -37,7 +43,7 @@ class MongoCR extends AuthProvider {
key
};
connection.command(`${source}.$cmd`, authenticateCommand, callback);
sendAuthCommand(connection, `${source}.$cmd`, authenticateCommand, callback);
});
}
}

View File

@@ -1,4 +1,5 @@
'use strict';
const retrieveBSON = require('../connection/utils').retrieveBSON;
const AuthProvider = require('./auth_provider').AuthProvider;
@@ -6,13 +7,19 @@ const AuthProvider = require('./auth_provider').AuthProvider;
const BSON = retrieveBSON();
const Binary = BSON.Binary;
/**
* Creates a new Plain authentication mechanism
*
* @extends AuthProvider
*/
class Plain extends AuthProvider {
auth(authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const password = credentials.password;
const payload = new Binary(`\x00${username}\x00${password}`);
const command = {
saslStart: 1,
@@ -21,7 +28,7 @@ class Plain extends AuthProvider {
autoAuthorize: 1
};
connection.command('$external.$cmd', command, callback);
sendAuthCommand(connection, '$external.$cmd', command, callback);
}
}

View File

@@ -1,4 +1,5 @@
'use strict';
const crypto = require('crypto');
const Buffer = require('safe-buffer').Buffer;
const retrieveBSON = require('../connection/utils').retrieveBSON;
@@ -15,236 +16,32 @@ try {
// don't do anything;
}
class ScramSHA extends AuthProvider {
constructor(bson, cryptoMethod) {
super(bson);
this.cryptoMethod = cryptoMethod || 'sha1';
}
prepare(handshakeDoc, authContext, callback) {
const cryptoMethod = this.cryptoMethod;
if (cryptoMethod === 'sha256' && saslprep == null) {
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized');
}
crypto.randomBytes(24, (err, nonce) => {
if (err) {
return callback(err);
}
// store the nonce for later use
Object.assign(authContext, { nonce });
const credentials = authContext.credentials;
const request = Object.assign({}, handshakeDoc, {
speculativeAuthenticate: Object.assign(makeFirstMessage(cryptoMethod, credentials, nonce), {
db: credentials.source
})
});
callback(undefined, request);
});
}
auth(authContext, callback) {
const response = authContext.response;
if (response && response.speculativeAuthenticate) {
continueScramConversation(
this.cryptoMethod,
response.speculativeAuthenticate,
authContext,
callback
);
return;
}
executeScram(this.cryptoMethod, authContext, callback);
}
}
function cleanUsername(username) {
return username.replace('=', '=3D').replace(',', '=2C');
}
function clientFirstMessageBare(username, nonce) {
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.
// Since the username is not sasl-prep-d, we need to do this here.
return Buffer.concat([
Buffer.from('n=', 'utf8'),
Buffer.from(username, 'utf8'),
Buffer.from(',r=', 'utf8'),
Buffer.from(nonce.toString('base64'), 'utf8')
]);
}
function makeFirstMessage(cryptoMethod, credentials, nonce) {
const username = cleanUsername(credentials.username);
const mechanism = cryptoMethod === 'sha1' ? 'SCRAM-SHA-1' : 'SCRAM-SHA-256';
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.
// Since the username is not sasl-prep-d, we need to do this here.
return {
saslStart: 1,
mechanism,
payload: new Binary(
Buffer.concat([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)])
),
autoAuthorize: 1,
options: { skipEmptyExchange: true }
};
}
function executeScram(cryptoMethod, authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
const nonce = authContext.nonce;
const db = credentials.source;
const saslStartCmd = makeFirstMessage(cryptoMethod, credentials, nonce);
connection.command(`${db}.$cmd`, saslStartCmd, (_err, result) => {
const err = resolveError(_err, result);
if (err) {
return callback(err);
}
continueScramConversation(cryptoMethod, result.result, authContext, callback);
});
}
function continueScramConversation(cryptoMethod, response, authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
const nonce = authContext.nonce;
const db = credentials.source;
const username = cleanUsername(credentials.username);
const password = credentials.password;
let processedPassword;
if (cryptoMethod === 'sha256') {
processedPassword = saslprep ? saslprep(password) : password;
} else {
try {
processedPassword = passwordDigest(username, password);
} catch (e) {
return callback(e);
}
}
const payload = Buffer.isBuffer(response.payload)
? new Binary(response.payload)
: response.payload;
const dict = parsePayload(payload.value());
const iterations = parseInt(dict.i, 10);
if (iterations && iterations < 4096) {
callback(new MongoError(`Server returned an invalid iteration count ${iterations}`), false);
return;
}
const salt = dict.s;
const rnonce = dict.r;
if (rnonce.startsWith('nonce')) {
callback(new MongoError(`Server returned an invalid nonce: ${rnonce}`), false);
return;
}
// Set up start of proof
const withoutProof = `c=biws,r=${rnonce}`;
const saltedPassword = HI(
processedPassword,
Buffer.from(salt, 'base64'),
iterations,
cryptoMethod
);
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key');
const serverKey = HMAC(cryptoMethod, saltedPassword, 'Server Key');
const storedKey = H(cryptoMethod, clientKey);
const authMessage = [
clientFirstMessageBare(username, nonce),
payload.value().toString('base64'),
withoutProof
].join(',');
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage);
const clientProof = `p=${xor(clientKey, clientSignature)}`;
const clientFinal = [withoutProof, clientProof].join(',');
const serverSignature = HMAC(cryptoMethod, serverKey, authMessage);
const saslContinueCmd = {
saslContinue: 1,
conversationId: response.conversationId,
payload: new Binary(Buffer.from(clientFinal))
};
connection.command(`${db}.$cmd`, saslContinueCmd, (_err, result) => {
const err = resolveError(_err, result);
if (err) {
return callback(err);
}
const r = result.result;
const parsedResponse = parsePayload(r.payload.value());
if (!compareDigest(Buffer.from(parsedResponse.v, 'base64'), serverSignature)) {
callback(new MongoError('Server returned an invalid signature'));
return;
}
if (!r || r.done !== false) {
return callback(err, r);
}
const retrySaslContinueCmd = {
saslContinue: 1,
conversationId: r.conversationId,
payload: Buffer.alloc(0)
};
connection.command(`${db}.$cmd`, retrySaslContinueCmd, callback);
});
}
function parsePayload(payload) {
const dict = {};
const parts = payload.split(',');
for (let i = 0; i < parts.length; i++) {
const valueParts = parts[i].split('=');
var parsePayload = function(payload) {
var dict = {};
var parts = payload.split(',');
for (var i = 0; i < parts.length; i++) {
var valueParts = parts[i].split('=');
dict[valueParts[0]] = valueParts[1];
}
return dict;
}
};
function passwordDigest(username, password) {
if (typeof username !== 'string') {
throw new MongoError('username must be a string');
}
if (typeof password !== 'string') {
throw new MongoError('password must be a string');
}
if (password.length === 0) {
throw new MongoError('password cannot be empty');
}
const md5 = crypto.createHash('md5');
md5.update(`${username}:mongo:${password}`, 'utf8');
var passwordDigest = function(username, password) {
if (typeof username !== 'string') throw new MongoError('username must be a string');
if (typeof password !== 'string') throw new MongoError('password must be a string');
if (password.length === 0) throw new MongoError('password cannot be empty');
// Use node md5 generator
var md5 = crypto.createHash('md5');
// Generate keys used for authentication
md5.update(username + ':mongo:' + password, 'utf8');
return md5.digest('hex');
}
};
// XOR two buffers
function xor(a, b) {
if (!Buffer.isBuffer(a)) {
a = Buffer.from(a);
}
if (!Buffer.isBuffer(b)) {
b = Buffer.from(b);
}
if (!Buffer.isBuffer(a)) a = Buffer.from(a);
if (!Buffer.isBuffer(b)) b = Buffer.from(b);
const length = Math.max(a.length, b.length);
const res = [];
@@ -269,12 +66,12 @@ function HMAC(method, key, text) {
.digest();
}
let _hiCache = {};
let _hiCacheCount = 0;
function _hiCachePurge() {
var _hiCache = {};
var _hiCacheCount = 0;
var _hiCachePurge = function() {
_hiCache = {};
_hiCacheCount = 0;
}
};
const hiLengthMap = {
sha256: 32,
@@ -324,19 +121,205 @@ function compareDigest(lhs, rhs) {
return result === 0;
}
function resolveError(err, result) {
if (err) return err;
/**
* Creates a new ScramSHA authentication mechanism
* @class
* @extends AuthProvider
*/
class ScramSHA extends AuthProvider {
constructor(bson, cryptoMethod) {
super(bson);
this.cryptoMethod = cryptoMethod || 'sha1';
}
const r = result.result;
if (r.$err || r.errmsg) return new MongoError(r);
static _getError(err, r) {
if (err) {
return err;
}
if (r.$err || r.errmsg) {
return new MongoError(r);
}
}
/**
* @ignore
*/
_executeScram(sendAuthCommand, connection, credentials, nonce, callback) {
let username = credentials.username;
const password = credentials.password;
const db = credentials.source;
const cryptoMethod = this.cryptoMethod;
let mechanism = 'SCRAM-SHA-1';
let processedPassword;
if (cryptoMethod === 'sha256') {
mechanism = 'SCRAM-SHA-256';
processedPassword = saslprep ? saslprep(password) : password;
} else {
try {
processedPassword = passwordDigest(username, password);
} catch (e) {
return callback(e);
}
}
// Clean up the user
username = username.replace('=', '=3D').replace(',', '=2C');
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.
// Since the username is not sasl-prep-d, we need to do this here.
const firstBare = Buffer.concat([
Buffer.from('n=', 'utf8'),
Buffer.from(username, 'utf8'),
Buffer.from(',r=', 'utf8'),
Buffer.from(nonce, 'utf8')
]);
// Build command structure
const saslStartCmd = {
saslStart: 1,
mechanism,
payload: new Binary(Buffer.concat([Buffer.from('n,,', 'utf8'), firstBare])),
autoAuthorize: 1
};
// Write the commmand on the connection
sendAuthCommand(connection, `${db}.$cmd`, saslStartCmd, (err, r) => {
let tmpError = ScramSHA._getError(err, r);
if (tmpError) {
return callback(tmpError, null);
}
const payload = Buffer.isBuffer(r.payload) ? new Binary(r.payload) : r.payload;
const dict = parsePayload(payload.value());
const iterations = parseInt(dict.i, 10);
if (iterations && iterations < 4096) {
callback(new MongoError(`Server returned an invalid iteration count ${iterations}`), false);
return;
}
const salt = dict.s;
const rnonce = dict.r;
if (rnonce.startsWith('nonce')) {
callback(new MongoError(`Server returned an invalid nonce: ${rnonce}`), false);
return;
}
// Set up start of proof
const withoutProof = `c=biws,r=${rnonce}`;
const saltedPassword = HI(
processedPassword,
Buffer.from(salt, 'base64'),
iterations,
cryptoMethod
);
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key');
const serverKey = HMAC(cryptoMethod, saltedPassword, 'Server Key');
const storedKey = H(cryptoMethod, clientKey);
const authMessage = [firstBare, payload.value().toString('base64'), withoutProof].join(',');
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage);
const clientProof = `p=${xor(clientKey, clientSignature)}`;
const clientFinal = [withoutProof, clientProof].join(',');
const serverSignature = HMAC(cryptoMethod, serverKey, authMessage);
const saslContinueCmd = {
saslContinue: 1,
conversationId: r.conversationId,
payload: new Binary(Buffer.from(clientFinal))
};
sendAuthCommand(connection, `${db}.$cmd`, saslContinueCmd, (err, r) => {
if (err || (r && typeof r.ok === 'number' && r.ok === 0)) {
callback(err, r);
return;
}
const parsedResponse = parsePayload(r.payload.value());
if (!compareDigest(Buffer.from(parsedResponse.v, 'base64'), serverSignature)) {
callback(new MongoError('Server returned an invalid signature'));
return;
}
if (!r || r.done !== false) {
return callback(err, r);
}
const retrySaslContinueCmd = {
saslContinue: 1,
conversationId: r.conversationId,
payload: Buffer.alloc(0)
};
sendAuthCommand(connection, `${db}.$cmd`, retrySaslContinueCmd, callback);
});
});
}
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
// Create a random nonce
crypto.randomBytes(24, (err, buff) => {
if (err) {
return callback(err, null);
}
return this._executeScram(
sendAuthCommand,
connection,
credentials,
buff.toString('base64'),
callback
);
});
}
/**
* Authenticate
* @override
* @method
*/
auth(sendAuthCommand, connections, credentials, callback) {
this._checkSaslprep();
super.auth(sendAuthCommand, connections, credentials, callback);
}
_checkSaslprep() {
const cryptoMethod = this.cryptoMethod;
if (cryptoMethod === 'sha256') {
if (!saslprep) {
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized');
}
}
}
}
/**
* Creates a new ScramSHA1 authentication mechanism
* @class
* @extends ScramSHA
*/
class ScramSHA1 extends ScramSHA {
constructor(bson) {
super(bson, 'sha1');
}
}
/**
* Creates a new ScramSHA256 authentication mechanism
* @class
* @extends ScramSHA
*/
class ScramSHA256 extends ScramSHA {
constructor(bson) {
super(bson, 'sha256');

View File

@@ -1,35 +1,26 @@
'use strict';
const AuthProvider = require('./auth_provider').AuthProvider;
/**
* Creates a new X509 authentication mechanism
* @class
* @extends AuthProvider
*/
class X509 extends AuthProvider {
prepare(handshakeDoc, authContext, callback) {
const credentials = authContext.credentials;
Object.assign(handshakeDoc, {
speculativeAuthenticate: x509AuthenticateCommand(credentials)
});
callback(undefined, handshakeDoc);
}
auth(authContext, callback) {
const connection = authContext.connection;
const credentials = authContext.credentials;
const response = authContext.response;
if (response.speculativeAuthenticate) {
return callback();
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const command = { authenticate: 1, mechanism: 'MONGODB-X509' };
if (username) {
command.user = username;
}
connection.command('$external.$cmd', x509AuthenticateCommand(credentials), callback);
sendAuthCommand(connection, '$external.$cmd', command, callback);
}
}
function x509AuthenticateCommand(credentials) {
const command = { authenticate: 1, mechanism: 'MONGODB-X509' };
if (credentials.username) {
Object.apply(command, { user: credentials.username });
}
return command;
}
module.exports = X509;

View File

@@ -2,11 +2,10 @@
const net = require('net');
const tls = require('tls');
const Connection = require('./connection');
const Query = require('./commands').Query;
const MongoError = require('../error').MongoError;
const MongoNetworkError = require('../error').MongoNetworkError;
const MongoNetworkTimeoutError = require('../error').MongoNetworkTimeoutError;
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
const AuthContext = require('../auth/auth_provider').AuthContext;
const WIRE_CONSTANTS = require('../wireprotocol/constants');
const makeClientMetadata = require('../utils').makeClientMetadata;
const MAX_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_WIRE_VERSION;
@@ -38,7 +37,30 @@ function connect(options, cancellationToken, callback) {
}
function isModernConnectionType(conn) {
return !(conn instanceof Connection);
return typeof conn.command === 'function';
}
function getSaslSupportedMechs(options) {
if (!(options && options.credentials)) {
return {};
}
const credentials = options.credentials;
// TODO: revisit whether or not items like `options.user` and `options.dbName` should be checked here
const authMechanism = credentials.mechanism;
const authSource = credentials.source || options.dbName || 'admin';
const user = credentials.username || options.user;
if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') {
return {};
}
if (!user) {
return {};
}
return { saslSupportedMechs: `${authSource}.${user}` };
}
function checkSupportedServer(ismaster, options) {
@@ -75,115 +97,77 @@ function performInitialHandshake(conn, options, _callback) {
_callback(err, ret);
};
const credentials = options.credentials;
if (credentials) {
if (!credentials.mechanism.match(/DEFAULT/i) && !AUTH_PROVIDERS[credentials.mechanism]) {
callback(new MongoError(`authMechanism '${credentials.mechanism}' not supported`));
return;
}
let compressors = [];
if (options.compression && options.compression.compressors) {
compressors = options.compression.compressors;
}
const authContext = new AuthContext(conn, credentials, options);
prepareHandshakeDocument(authContext, (err, handshakeDoc) => {
const handshakeDoc = Object.assign(
{
ismaster: true,
client: options.metadata || makeClientMetadata(options),
compression: compressors
},
getSaslSupportedMechs(options)
);
const handshakeOptions = Object.assign({}, options);
// The handshake technically is a monitoring check, so its socket timeout should be connectTimeoutMS
if (options.connectTimeoutMS || options.connectionTimeout) {
handshakeOptions.socketTimeout = options.connectTimeoutMS || options.connectionTimeout;
}
const start = new Date().getTime();
runCommand(conn, 'admin.$cmd', handshakeDoc, handshakeOptions, (err, ismaster) => {
if (err) {
return callback(err);
callback(err);
return;
}
const handshakeOptions = Object.assign({}, options);
if (options.connectTimeoutMS || options.connectionTimeout) {
// The handshake technically is a monitoring check, so its socket timeout should be connectTimeoutMS
handshakeOptions.socketTimeout = options.connectTimeoutMS || options.connectionTimeout;
if (ismaster.ok === 0) {
callback(new MongoError(ismaster));
return;
}
const start = new Date().getTime();
conn.command('admin.$cmd', handshakeDoc, handshakeOptions, (err, result) => {
if (err) {
callback(err);
return;
}
const supportedServerErr = checkSupportedServer(ismaster, options);
if (supportedServerErr) {
callback(supportedServerErr);
return;
}
const response = result.result;
if (response.ok === 0) {
callback(new MongoError(response));
return;
}
if (!isModernConnectionType(conn)) {
// resolve compression
if (ismaster.compression) {
const agreedCompressors = compressors.filter(
compressor => ismaster.compression.indexOf(compressor) !== -1
);
const supportedServerErr = checkSupportedServer(response, options);
if (supportedServerErr) {
callback(supportedServerErr);
return;
}
if (agreedCompressors.length) {
conn.agreedCompressor = agreedCompressors[0];
}
if (!isModernConnectionType(conn)) {
// resolve compression
if (response.compression) {
const agreedCompressors = handshakeDoc.compression.filter(
compressor => response.compression.indexOf(compressor) !== -1
);
if (agreedCompressors.length) {
conn.agreedCompressor = agreedCompressors[0];
}
if (options.compression && options.compression.zlibCompressionLevel) {
conn.zlibCompressionLevel = options.compression.zlibCompressionLevel;
}
if (options.compression && options.compression.zlibCompressionLevel) {
conn.zlibCompressionLevel = options.compression.zlibCompressionLevel;
}
}
}
// NOTE: This is metadata attached to the connection while porting away from
// handshake being done in the `Server` class. Likely, it should be
// relocated, or at very least restructured.
conn.ismaster = response;
conn.lastIsMasterMS = new Date().getTime() - start;
// NOTE: This is metadata attached to the connection while porting away from
// handshake being done in the `Server` class. Likely, it should be
// relocated, or at very least restructured.
conn.ismaster = ismaster;
conn.lastIsMasterMS = new Date().getTime() - start;
if (!response.arbiterOnly && credentials) {
// store the response on auth context
Object.assign(authContext, { response });
const resolvedCredentials = credentials.resolveAuthMechanism(response);
const authProvider = AUTH_PROVIDERS[resolvedCredentials.mechanism];
authProvider.auth(authContext, err => {
if (err) return callback(err);
callback(undefined, conn);
});
return;
}
callback(undefined, conn);
});
});
}
function prepareHandshakeDocument(authContext, callback) {
const options = authContext.options;
const compressors =
options.compression && options.compression.compressors ? options.compression.compressors : [];
const handshakeDoc = {
ismaster: true,
client: options.metadata || makeClientMetadata(options),
compression: compressors
};
const credentials = authContext.credentials;
if (credentials) {
if (credentials.mechanism.match(/DEFAULT/i) && credentials.username) {
Object.assign(handshakeDoc, {
saslSupportedMechs: `${credentials.source}.${credentials.username}`
});
AUTH_PROVIDERS['scram-sha-256'].prepare(handshakeDoc, authContext, callback);
const credentials = options.credentials;
if (!ismaster.arbiterOnly && credentials) {
credentials.resolveAuthMechanism(ismaster);
authenticate(conn, credentials, callback);
return;
}
const authProvider = AUTH_PROVIDERS[credentials.mechanism];
authProvider.prepare(handshakeDoc, authContext, callback);
return;
}
callback(undefined, handshakeDoc);
callback(undefined, conn);
});
}
const LEGAL_SSL_SOCKET_OPTIONS = [
@@ -255,7 +239,7 @@ function makeConnection(family, options, cancellationToken, _callback) {
const useSsl = typeof options.ssl === 'boolean' ? options.ssl : false;
const keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
let keepAliveInitialDelay =
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 120000;
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
const noDelay = typeof options.noDelay === 'boolean' ? options.noDelay : true;
const connectionTimeout =
typeof options.connectionTimeout === 'number'
@@ -334,12 +318,92 @@ function makeConnection(family, options, cancellationToken, _callback) {
socket.once(connectEvent, connectHandler);
}
const CONNECTION_ERROR_EVENTS = ['error', 'close', 'timeout', 'parseError'];
function runCommand(conn, ns, command, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
// are we using the new connection type? if so, no need to simulate a rpc `command` method
if (isModernConnectionType(conn)) {
conn.command(ns, command, options, (err, result) => {
if (err) {
callback(err);
return;
}
// NODE-2382: raw wire protocol messages, or command results should not be used anymore
callback(undefined, result.result);
});
return;
}
const socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
const bson = conn.options.bson;
const query = new Query(bson, ns, command, {
numberToSkip: 0,
numberToReturn: 1
});
const noop = () => {};
function _callback(err, result) {
callback(err, result);
callback = noop;
}
function errorHandler(err) {
conn.resetSocketTimeout();
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.removeListener(eventName, errorHandler));
conn.removeListener('message', messageHandler);
if (err == null) {
err = new MongoError(`runCommand failed for connection to '${conn.address}'`);
}
// ignore all future errors
conn.on('error', noop);
_callback(err);
}
function messageHandler(msg) {
if (msg.responseTo !== query.requestId) {
return;
}
conn.resetSocketTimeout();
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.removeListener(eventName, errorHandler));
conn.removeListener('message', messageHandler);
msg.parse({ promoteValues: true });
_callback(undefined, msg.documents[0]);
}
conn.setSocketTimeout(socketTimeout);
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.once(eventName, errorHandler));
conn.on('message', messageHandler);
conn.write(query.toBin());
}
function authenticate(conn, credentials, callback) {
const mechanism = credentials.mechanism;
if (!AUTH_PROVIDERS[mechanism]) {
callback(new MongoError(`authMechanism '${mechanism}' not supported`));
return;
}
const provider = AUTH_PROVIDERS[mechanism];
provider.auth(runCommand, [conn], credentials, err => {
if (err) return callback(err);
callback(undefined, conn);
});
}
function connectionFailureError(type, err) {
switch (type) {
case 'error':
return new MongoNetworkError(err);
case 'timeout':
return new MongoNetworkTimeoutError(`connection timed out`);
return new MongoNetworkError(`connection timed out`);
case 'close':
return new MongoNetworkError(`connection closed`);
case 'cancel':

View File

@@ -8,15 +8,12 @@ const decompress = require('../wireprotocol/compression').decompress;
const Response = require('./commands').Response;
const BinMsg = require('./msg').BinMsg;
const MongoNetworkError = require('../error').MongoNetworkError;
const MongoNetworkTimeoutError = require('../error').MongoNetworkTimeoutError;
const MongoError = require('../error').MongoError;
const Logger = require('./logger');
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
const Buffer = require('safe-buffer').Buffer;
const Query = require('./commands').Query;
const CommandResult = require('./command_result');
let _id = 0;
@@ -67,7 +64,7 @@ class Connection extends EventEmitter {
* @param {string} [options.host='localhost'] The host the socket is connected to
* @param {number} [options.port=27017] The port used for the socket connection
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
* @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits
@@ -97,7 +94,7 @@ class Connection extends EventEmitter {
// These values are inspected directly in tests, but maybe not necessary to keep around
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
this.keepAliveInitialDelay =
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 120000;
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
this.connectionTimeout =
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
if (this.keepAliveInitialDelay > this.socketTimeout) {
@@ -308,71 +305,8 @@ class Connection extends EventEmitter {
if (this.destroyed) return false;
return !this.socket.destroyed && this.socket.writable;
}
/**
* @ignore
*/
command(ns, command, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
const conn = this;
const socketTimeout =
typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
const bson = conn.options.bson;
const query = new Query(bson, ns, command, {
numberToSkip: 0,
numberToReturn: 1
});
const noop = () => {};
function _callback(err, result) {
callback(err, result);
callback = noop;
}
function errorHandler(err) {
conn.resetSocketTimeout();
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.removeListener(eventName, errorHandler));
conn.removeListener('message', messageHandler);
if (err == null) {
err = new MongoError(`runCommand failed for connection to '${conn.address}'`);
}
// ignore all future errors
conn.on('error', noop);
_callback(err);
}
function messageHandler(msg) {
if (msg.responseTo !== query.requestId) {
return;
}
conn.resetSocketTimeout();
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.removeListener(eventName, errorHandler));
conn.removeListener('message', messageHandler);
msg.parse({ promoteValues: true });
const response = msg.documents[0];
if (response.ok === 0 || response.$err || response.errmsg || response.code) {
_callback(new MongoError(response));
return;
}
_callback(undefined, new CommandResult(response, this, msg));
}
conn.setSocketTimeout(socketTimeout);
CONNECTION_ERROR_EVENTS.forEach(eventName => conn.once(eventName, errorHandler));
conn.on('message', messageHandler);
conn.write(query.toBin());
}
}
const CONNECTION_ERROR_EVENTS = ['error', 'close', 'timeout', 'parseError'];
function deleteConnection(id) {
// console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : ''))
delete connections[id];
@@ -418,9 +352,7 @@ function timeoutHandler(conn) {
conn.timedOut = true;
conn.emit(
'timeout',
new MongoNetworkTimeoutError(`connection ${conn.id} to ${conn.address} timed out`, {
beforeHandshake: conn.ismaster == null
}),
new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`),
conn
);
};

View File

@@ -60,7 +60,7 @@ var _id = 0;
* @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
* @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
@@ -113,7 +113,7 @@ var Pool = function(topology, options) {
connectionTimeout: 30000,
socketTimeout: 360000,
keepAlive: true,
keepAliveInitialDelay: 120000,
keepAliveInitialDelay: 300000,
noDelay: true,
// SSL Settings
ssl: false,

View File

@@ -464,41 +464,50 @@ class CoreCursor extends Readable {
}
const result = r.message;
if (result.queryFailure) {
return done(new MongoError(result.documents[0]), null);
}
if (Array.isArray(result.documents) && result.documents.length === 1) {
const document = result.documents[0];
if (result.queryFailure) {
return done(new MongoError(document), null);
// Check if we have a command cursor
if (
Array.isArray(result.documents) &&
result.documents.length === 1 &&
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
(typeof result.documents[0].cursor !== 'string' ||
result.documents[0]['$err'] ||
result.documents[0]['errmsg'] ||
Array.isArray(result.documents[0].result))
) {
// We have an error document, return the error
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
return done(new MongoError(result.documents[0]), null);
}
// Check if we have a command cursor
if (!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) {
// We have an error document, return the error
if (document.$err || document.errmsg) {
return done(new MongoError(document), null);
// We have a cursor document
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
const id = result.documents[0].cursor.id;
// If we have a namespace change set the new namespace for getmores
if (result.documents[0].cursor.ns) {
cursor.ns = result.documents[0].cursor.ns;
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = result.documents[0].operationTime;
// If we have a firstBatch set it
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
}
// We have a cursor document
if (document.cursor != null && typeof document.cursor !== 'string') {
const id = document.cursor.id;
// If we have a namespace change set the new namespace for getmores
if (document.cursor.ns) {
cursor.ns = document.cursor.ns;
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = document.operationTime;
// Return after processing command cursor
return done(null, result);
}
// If we have a firstBatch set it
if (Array.isArray(document.cursor.firstBatch)) {
cursor.cursorState.documents = document.cursor.firstBatch; //.reverse();
}
// Return after processing command cursor
return done(null, result);
}
if (Array.isArray(result.documents[0].result)) {
cursor.cursorState.documents = result.documents[0].result;
cursor.cursorState.cursorId = Long.ZERO;
return done(null, result);
}
}

View File

@@ -1,7 +1,5 @@
'use strict';
const kErrorLabels = Symbol('errorLabels');
/**
* Creates a new MongoError
*
@@ -20,12 +18,8 @@ class MongoError extends Error {
super(message);
} else {
super(message.message || message.errmsg || message.$err || 'n/a');
if (message.errorLabels) {
this[kErrorLabels] = new Set(message.errorLabels);
}
for (var name in message) {
if (name === 'errorLabels' || name === 'errmsg') {
if (name === 'errmsg') {
continue;
}
@@ -63,29 +57,8 @@ class MongoError extends Error {
* @returns {boolean} returns true if the error has the provided error label
*/
hasErrorLabel(label) {
if (this[kErrorLabels] == null) {
return false;
}
return this[kErrorLabels].has(label);
return this.errorLabels && this.errorLabels.indexOf(label) !== -1;
}
addErrorLabel(label) {
if (this[kErrorLabels] == null) {
this[kErrorLabels] = new Set();
}
this[kErrorLabels].add(label);
}
get errorLabels() {
return this[kErrorLabels] ? Array.from(this[kErrorLabels]) : [];
}
}
const kBeforeHandshake = Symbol('beforeHandshake');
function isNetworkErrorBeforeHandshake(err) {
return err[kBeforeHandshake] === true;
}
/**
@@ -98,28 +71,9 @@ function isNetworkErrorBeforeHandshake(err) {
* @extends MongoError
*/
class MongoNetworkError extends MongoError {
constructor(message, options) {
constructor(message) {
super(message);
this.name = 'MongoNetworkError';
if (options && options.beforeHandshake === true) {
this[kBeforeHandshake] = true;
}
}
}
/**
* An error indicating a network timeout occurred
*
* @param {Error|string|object} message The error message
* @property {string} message The error message
* @property {object} [options.beforeHandshake] Indicates the timeout happened before a connection handshake completed
* @extends MongoError
*/
class MongoNetworkTimeoutError extends MongoNetworkError {
constructor(message, options) {
super(message, options);
this.name = 'MongoNetworkTimeoutError';
}
}
@@ -204,10 +158,6 @@ class MongoWriteConcernError extends MongoError {
super(message);
this.name = 'MongoWriteConcernError';
if (result && Array.isArray(result.errorLabels)) {
this[kErrorLabels] = new Set(result.errorLabels);
}
if (result != null) {
this.result = makeWriteConcernResultObject(result);
}
@@ -229,32 +179,6 @@ const RETRYABLE_ERROR_CODES = new Set([
13436 // NotMasterOrSecondary
]);
const RETRYABLE_WRITE_ERROR_CODES = new Set([
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
10107, // NotMaster
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
189, // PrimarySteppedDown
91, // ShutdownInProgress
7, // HostNotFound
6, // HostUnreachable
89, // NetworkTimeout
9001, // SocketException
262 // ExceededTimeLimit
]);
function isRetryableWriteError(error) {
if (error instanceof MongoWriteConcernError) {
return (
RETRYABLE_WRITE_ERROR_CODES.has(error.code) ||
RETRYABLE_WRITE_ERROR_CODES.has(error.result.code)
);
}
return RETRYABLE_WRITE_ERROR_CODES.has(error.code);
}
/**
* Determines whether an error is something the driver should attempt to retry
*
@@ -335,10 +259,13 @@ function isSDAMUnrecoverableError(error) {
return false;
}
function isNetworkTimeoutError(err) {
return err instanceof MongoNetworkError && err.message.match(/timed out/);
}
module.exports = {
MongoError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoParseError,
MongoTimeoutError,
MongoServerSelectionError,
@@ -346,6 +273,5 @@ module.exports = {
isRetryableError,
isSDAMUnrecoverableError,
isNodeShuttingDownError,
isRetryableWriteError,
isNetworkErrorBeforeHandshake
isNetworkTimeoutError
};

View File

@@ -28,13 +28,6 @@ const ServerType = {
Unknown: 'Unknown'
};
// helper to get a server's type that works for both legacy and unified topologies
function serverType(server) {
let description = server.s.description || server.s.serverDescription;
if (description.topologyType === TopologyType.Single) return description.servers[0].type;
return description.type;
}
const TOPOLOGY_DEFAULTS = {
useUnifiedTopology: true,
localThresholdMS: 15,
@@ -61,7 +54,6 @@ module.exports = {
TOPOLOGY_DEFAULTS,
TopologyType,
ServerType,
serverType,
drainTimerQueue,
clearAndRemoveTimerFrom
};

View File

@@ -6,8 +6,7 @@ const connect = require('../connection/connect');
const Connection = require('../../cmap/connection').Connection;
const common = require('./common');
const makeStateMachine = require('../utils').makeStateMachine;
const MongoNetworkError = require('../error').MongoNetworkError;
const BSON = require('../connection/utils').retrieveBSON();
const MongoError = require('../error').MongoError;
const makeInterruptableAsyncInterval = require('../../utils').makeInterruptableAsyncInterval;
const calculateDurationInMs = require('../../utils').calculateDurationInMs;
const now = require('../../utils').now;
@@ -21,15 +20,13 @@ const kServer = Symbol('server');
const kMonitorId = Symbol('monitorId');
const kConnection = Symbol('connection');
const kCancellationToken = Symbol('cancellationToken');
const kRTTPinger = Symbol('rttPinger');
const kRoundTripTime = Symbol('roundTripTime');
const STATE_CLOSED = common.STATE_CLOSED;
const STATE_CLOSING = common.STATE_CLOSING;
const STATE_IDLE = 'idle';
const STATE_MONITORING = 'monitoring';
const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_IDLE, STATE_CLOSED],
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED],
[STATE_CLOSED]: [STATE_CLOSED, STATE_MONITORING],
[STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, STATE_CLOSING],
[STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, STATE_CLOSING]
@@ -69,29 +66,28 @@ class Monitor extends EventEmitter {
});
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
const connectOptions = Object.assign(
{
id: '<monitor>',
host: server.description.host,
port: server.description.port,
bson: server.s.bson,
connectionType: Connection
},
server.s.options,
this.options,
const addressParts = server.description.address.split(':');
this.connectOptions = Object.freeze(
Object.assign(
{
id: '<monitor>',
host: addressParts[0],
port: parseInt(addressParts[1], 10),
bson: server.s.bson,
connectionType: Connection
},
server.s.options,
this.options,
// force BSON serialization options
{
raw: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true
}
// force BSON serialization options
{
raw: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true
}
)
);
// ensure no authentication is used for monitoring
delete connectOptions.credentials;
this.connectOptions = Object.freeze(connectOptions);
}
connect() {
@@ -117,165 +113,88 @@ class Monitor extends EventEmitter {
this[kMonitorId].wake();
}
reset() {
if (isInCloseState(this)) {
return;
}
stateTransition(this, STATE_CLOSING);
resetMonitorState(this);
// restart monitor
stateTransition(this, STATE_IDLE);
// restart monitoring
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS
});
}
close() {
if (isInCloseState(this)) {
return;
}
stateTransition(this, STATE_CLOSING);
resetMonitorState(this);
this[kCancellationToken].emit('cancel');
if (this[kMonitorId]) {
this[kMonitorId].stop();
this[kMonitorId] = null;
}
if (this[kConnection]) {
this[kConnection].destroy({ force: true });
}
// close monitor
this.emit('close');
stateTransition(this, STATE_CLOSED);
}
}
function resetMonitorState(monitor) {
stateTransition(monitor, STATE_CLOSING);
if (monitor[kMonitorId]) {
monitor[kMonitorId].stop();
monitor[kMonitorId] = null;
}
if (monitor[kRTTPinger]) {
monitor[kRTTPinger].close();
monitor[kRTTPinger] = undefined;
}
monitor[kCancellationToken].emit('cancel');
if (monitor[kMonitorId]) {
clearTimeout(monitor[kMonitorId]);
monitor[kMonitorId] = undefined;
}
if (monitor[kConnection]) {
monitor[kConnection].destroy({ force: true });
}
}
function checkServer(monitor, callback) {
let start = now();
if (monitor[kConnection] && monitor[kConnection].closed) {
monitor[kConnection] = undefined;
}
const start = now();
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));
function failureHandler(err) {
if (monitor[kConnection]) {
monitor[kConnection].destroy({ force: true });
monitor[kConnection] = undefined;
}
monitor.emit(
'serverHeartbeatFailed',
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address)
);
monitor.emit('resetServer', err);
monitor.emit('resetConnectionPool');
callback(err);
}
if (monitor[kConnection] != null && !monitor[kConnection].closed) {
function successHandler(isMaster) {
monitor.emit(
'serverHeartbeatSucceeded',
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address)
);
return callback(undefined, isMaster);
}
if (monitor[kConnection] != null) {
const connectTimeoutMS = monitor.options.connectTimeoutMS;
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;
const cmd = isAwaitable
? { ismaster: true, maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
: { ismaster: true };
const options = isAwaitable
? { socketTimeout: connectTimeoutMS + maxAwaitTimeMS, exhaustAllowed: true }
: { socketTimeout: connectTimeoutMS };
if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], monitor.connectOptions);
}
monitor[kConnection].command('admin.$cmd', cmd, options, (err, result) => {
if (err) {
failureHandler(err);
return;
}
const isMaster = result.result;
const duration = isAwaitable
? monitor[kRTTPinger].roundTripTime
: calculateDurationInMs(start);
monitor.emit(
'serverHeartbeatSucceeded',
new ServerHeartbeatSucceededEvent(duration, isMaster, monitor.address)
);
// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (isAwaitable && isMaster.topologyVersion) {
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));
start = now();
} else {
if (monitor[kRTTPinger]) {
monitor[kRTTPinger].close();
monitor[kRTTPinger] = undefined;
monitor[kConnection].command(
'admin.$cmd',
{ ismaster: true },
{ socketTimeout: connectTimeoutMS },
(err, result) => {
if (err) {
failureHandler(err);
return;
}
callback(undefined, isMaster);
successHandler(result.result);
}
});
);
return;
}
// connecting does an implicit `ismaster`
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => {
if (conn && isInCloseState(monitor)) {
conn.destroy({ force: true });
return;
}
if (err) {
monitor[kConnection] = undefined;
// we already reset the connection pool on network errors in all cases
if (!(err instanceof MongoNetworkError)) {
monitor.emit('resetConnectionPool');
}
failureHandler(err);
return;
}
monitor[kConnection] = conn;
monitor.emit(
'serverHeartbeatSucceeded',
new ServerHeartbeatSucceededEvent(
calculateDurationInMs(start),
conn.ismaster,
monitor.address
)
);
if (isInCloseState(monitor)) {
conn.destroy({ force: true });
failureHandler(new MongoError('monitor was destroyed'));
return;
}
callback(undefined, conn.ismaster);
monitor[kConnection] = conn;
successHandler(conn.ismaster);
});
}
@@ -293,113 +212,33 @@ function monitorServer(monitor) {
// TODO: the next line is a legacy event, remove in v4
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
checkServer(monitor, (err, isMaster) => {
if (err) {
// otherwise an error occured on initial discovery, also bail
if (monitor[kServer].description.type === ServerType.Unknown) {
monitor.emit('resetServer', err);
return done();
checkServer(monitor, e0 => {
if (e0 == null) {
return done();
}
// otherwise an error occured on initial discovery, also bail
if (monitor[kServer].description.type === ServerType.Unknown) {
monitor.emit('resetServer', e0);
return done();
}
// According to the SDAM specification's "Network error during server check" section, if
// an ismaster call fails we reset the server's pool. If a server was once connected,
// change its type to `Unknown` only after retrying once.
monitor.emit('resetConnectionPool');
checkServer(monitor, e1 => {
if (e1) {
monitor.emit('resetServer', e1);
}
}
// if the check indicates streaming is supported, immediately reschedule monitoring
if (isMaster && isMaster.topologyVersion) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
monitor[kMonitorId].wake();
}
});
}
done();
done();
});
});
};
}
function makeTopologyVersion(tv) {
return {
processId: tv.processId,
counter: BSON.Long.fromNumber(tv.counter)
};
}
class RTTPinger {
constructor(cancellationToken, options) {
this[kConnection] = null;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
}
get roundTripTime() {
return this[kRoundTripTime];
}
close() {
this.closed = true;
clearTimeout(this[kMonitorId]);
this[kMonitorId] = undefined;
if (this[kConnection]) {
this[kConnection].destroy({ force: true });
}
}
}
function measureRoundTripTime(rttPinger, options) {
const start = now();
const cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
if (rttPinger.closed) {
return;
}
function measureAndReschedule(conn) {
if (rttPinger.closed) {
conn.destroy({ force: true });
return;
}
if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
}
rttPinger[kRoundTripTime] = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
heartbeatFrequencyMS
);
}
if (rttPinger[kConnection] == null) {
connect(options, cancellationToken, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
measureAndReschedule(conn);
});
return;
}
rttPinger[kConnection].command('admin.$cmd', { ismaster: 1 }, err => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
measureAndReschedule();
});
}
module.exports = {
Monitor
};

View File

@@ -7,22 +7,17 @@ const relayEvents = require('../utils').relayEvents;
const BSON = require('../connection/utils').retrieveBSON();
const Logger = require('../connection/logger');
const ServerDescription = require('./server_description').ServerDescription;
const compareTopologyVersion = require('./server_description').compareTopologyVersion;
const ReadPreference = require('../topologies/read_preference');
const Monitor = require('./monitor').Monitor;
const MongoNetworkError = require('../error').MongoNetworkError;
const MongoNetworkTimeoutError = require('../error').MongoNetworkTimeoutError;
const collationNotSupported = require('../utils').collationNotSupported;
const debugOptions = require('../connection/utils').debugOptions;
const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError;
const isRetryableWriteError = require('../error').isRetryableWriteError;
const isNetworkTimeoutError = require('../error').isNetworkTimeoutError;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const isNetworkErrorBeforeHandshake = require('../error').isNetworkErrorBeforeHandshake;
const maxWireVersion = require('../utils').maxWireVersion;
const makeStateMachine = require('../utils').makeStateMachine;
const common = require('./common');
const ServerType = common.ServerType;
const isTransactionCommand = require('../transactions').isTransactionCommand;
// Used for filtering out fields for logging
const DEBUG_FIELDS = [
@@ -115,8 +110,9 @@ class Server extends EventEmitter {
// create the connection pool
// NOTE: this used to happen in `connect`, we supported overriding pool options there
const addressParts = this.description.address.split(':');
const poolOptions = Object.assign(
{ host: this.description.host, port: this.description.port, bson: this.s.bson },
{ host: addressParts[0], port: parseInt(addressParts[1], 10), bson: this.s.bson },
options
);
@@ -282,7 +278,7 @@ class Server extends EventEmitter {
return cb(err);
}
conn.command(ns, cmd, options, makeOperationHandler(this, conn, cmd, options, cb));
conn.command(ns, cmd, options, makeOperationHandler(this, options, cb));
}, callback);
}
@@ -306,7 +302,7 @@ class Server extends EventEmitter {
return cb(err);
}
conn.query(ns, cmd, cursorState, options, makeOperationHandler(this, conn, cmd, options, cb));
conn.query(ns, cmd, cursorState, options, makeOperationHandler(this, options, cb));
}, callback);
}
@@ -330,13 +326,7 @@ class Server extends EventEmitter {
return cb(err);
}
conn.getMore(
ns,
cursorState,
batchSize,
options,
makeOperationHandler(this, conn, null, options, cb)
);
conn.getMore(ns, cursorState, batchSize, options, makeOperationHandler(this, options, cb));
}, callback);
}
@@ -362,7 +352,7 @@ class Server extends EventEmitter {
return cb(err);
}
conn.killCursors(ns, cursorState, makeOperationHandler(this, conn, null, undefined, cb));
conn.killCursors(ns, cursorState, makeOperationHandler(this, null, cb));
}, callback);
}
@@ -424,14 +414,6 @@ Object.defineProperty(Server.prototype, 'clusterTime', {
}
});
function supportsRetryableWrites(server) {
return (
server.description.maxWireVersion >= 6 &&
server.description.logicalSessionTimeoutMinutes &&
server.description.type !== ServerType.Standalone
);
}
function calculateRoundTripTime(oldRtt, duration) {
if (oldRtt === -1) {
return duration;
@@ -466,13 +448,6 @@ function executeWriteOperation(args, options, callback) {
callback(new MongoError(`server ${server.name} does not support collation`));
return;
}
const unacknowledgedWrite = options.writeConcern && options.writeConcern.w === 0;
if (unacknowledgedWrite || maxWireVersion(server) < 5) {
if ((op === 'update' || op === 'remove') && ops.find(o => o.hint)) {
callback(new MongoError(`servers < 3.4 do not support hint on ${op}`));
return;
}
}
server.s.pool.withConnection((err, conn, cb) => {
if (err) {
@@ -480,78 +455,38 @@ function executeWriteOperation(args, options, callback) {
return cb(err);
}
conn[op](ns, ops, options, makeOperationHandler(server, conn, ops, options, cb));
conn[op](ns, ops, options, makeOperationHandler(server, options, cb));
}, callback);
}
function markServerUnknown(server, error) {
if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor].reset();
}
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, {
error,
topologyVersion:
error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
})
new ServerDescription(server.description.address, null, { error })
);
}
function connectionIsStale(pool, connection) {
return connection.generation !== pool.generation;
}
function shouldHandleStateChangeError(server, err) {
const etv = err.topologyVersion;
const stv = server.description.topologyVersion;
return compareTopologyVersion(stv, etv) < 0;
}
function inActiveTransaction(session, cmd) {
return session && session.inTransaction() && !isTransactionCommand(cmd);
}
function makeOperationHandler(server, connection, cmd, options, callback) {
function makeOperationHandler(server, options, callback) {
const session = options && options.session;
return function handleOperationResult(err, result) {
if (err && !connectionIsStale(server.s.pool, connection)) {
if (err) {
if (err instanceof MongoNetworkError) {
if (session && !session.hasEnded) {
session.serverSession.isDirty = true;
}
if (supportsRetryableWrites(server) && !inActiveTransaction(session, cmd)) {
err.addErrorLabel('RetryableWriteError');
}
if (!(err instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(err)) {
if (!isNetworkTimeoutError(err)) {
markServerUnknown(server, err);
server.s.pool.clear();
}
} else {
// if pre-4.4 server, then add error label if its a retryable write error
if (
maxWireVersion(server) < 9 &&
isRetryableWriteError(err) &&
!inActiveTransaction(session, cmd)
) {
err.addErrorLabel('RetryableWriteError');
} else if (isSDAMUnrecoverableError(err)) {
if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) {
server.s.pool.clear();
}
if (isSDAMUnrecoverableError(err)) {
if (shouldHandleStateChangeError(server, err)) {
if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) {
server.s.pool.clear();
}
markServerUnknown(server, err);
process.nextTick(() => server.requestCheck());
}
}
markServerUnknown(server, err);
process.nextTick(() => server.requestCheck());
}
}

View File

@@ -53,8 +53,6 @@ class ServerDescription {
* @param {Object} [ismaster] An optional ismaster response for this server
* @param {Object} [options] Optional settings
* @param {Number} [options.roundTripTime] The round trip time to ping this server (in ms)
* @param {Error} [options.error] An Error used for better reporting debugging
* @param {any} [options.topologyVersion] The topologyVersion
*/
constructor(address, ismaster, options) {
options = options || {};
@@ -77,7 +75,6 @@ class ServerDescription {
this.lastWriteDate = ismaster.lastWrite ? ismaster.lastWrite.lastWriteDate : null;
this.opTime = ismaster.lastWrite ? ismaster.lastWrite.opTime : null;
this.type = parseServerType(ismaster);
this.topologyVersion = options.topologyVersion || ismaster.topologyVersion;
// direct mappings
ISMASTER_FIELDS.forEach(field => {
@@ -116,16 +113,6 @@ class ServerDescription {
return WRITABLE_SERVER_TYPES.has(this.type);
}
get host() {
const chopLength = `:${this.port}`.length;
return this.address.slice(0, -chopLength);
}
get port() {
const port = this.address.split(':').pop();
return port ? Number.parseInt(port, 10) : port;
}
/**
* Determines if another `ServerDescription` is equal to this one per the rules defined
* in the {@link https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#serverdescription|SDAM spec}
@@ -134,10 +121,6 @@ class ServerDescription {
* @return {Boolean}
*/
equals(other) {
const topologyVersionsEqual =
this.topologyVersion === other.topologyVersion ||
compareTopologyVersion(this.topologyVersion, other.topologyVersion) === 0;
return (
other != null &&
errorStrictEqual(this.error, other.error) &&
@@ -152,8 +135,7 @@ class ServerDescription {
? other.electionId && this.electionId.equals(other.electionId)
: this.electionId === other.electionId) &&
this.primary === other.primary &&
this.logicalSessionTimeoutMinutes === other.logicalSessionTimeoutMinutes &&
topologyVersionsEqual
this.logicalSessionTimeoutMinutes === other.logicalSessionTimeoutMinutes
);
}
}
@@ -194,34 +176,7 @@ function parseServerType(ismaster) {
return ServerType.Standalone;
}
/**
* Compares two topology versions.
*
* @param {object} lhs
* @param {object} rhs
* @returns A negative number if `lhs` is older than `rhs`; positive if `lhs` is newer than `rhs`; 0 if they are equivalent.
*/
function compareTopologyVersion(lhs, rhs) {
if (lhs == null || rhs == null) {
return -1;
}
if (lhs.processId.equals(rhs.processId)) {
// TODO: handle counters as Longs
if (lhs.counter === rhs.counter) {
return 0;
} else if (lhs.counter < rhs.counter) {
return -1;
}
return 1;
}
return -1;
}
module.exports = {
ServerDescription,
parseServerType,
compareTopologyVersion
parseServerType
};

View File

@@ -14,6 +14,7 @@ const CoreCursor = require('../cursor').CoreCursor;
const deprecate = require('util').deprecate;
const BSON = require('../connection/utils').retrieveBSON();
const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
const isRetryableError = require('../error').isRetryableError;
const ClientSession = require('../sessions').ClientSession;
const MongoError = require('../error').MongoError;
const MongoServerSelectionError = require('../error').MongoServerSelectionError;
@@ -26,7 +27,6 @@ const emitDeprecationWarning = require('../../utils').emitDeprecationWarning;
const ServerSessionPool = require('../sessions').ServerSessionPool;
const makeClientMetadata = require('../utils').makeClientMetadata;
const CMAP_EVENT_NAMES = require('../../cmap/events').CMAP_EVENT_NAMES;
const compareTopologyVersion = require('./server_description').compareTopologyVersion;
const common = require('./common');
const drainTimerQueue = common.drainTimerQueue;
@@ -275,9 +275,9 @@ class Topology extends EventEmitter {
// connect all known servers, then attempt server selection to connect
connectServers(this, Array.from(this.s.description.servers.values()));
ReadPreference.translate(options);
translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
const connectHandler = err => {
this.selectServer(readPreferenceServerSelector(readPreference), options, err => {
if (err) {
this.close();
@@ -295,15 +295,7 @@ class Topology extends EventEmitter {
this.emit('connect', this);
if (typeof callback === 'function') callback(err, this);
};
// TODO: NODE-2471
if (this.s.credentials) {
this.command('admin.$cmd', { ping: 1 }, { readPreference }, connectHandler);
return;
}
this.selectServer(readPreferenceServerSelector(readPreference), options, connectHandler);
});
}
/**
@@ -389,7 +381,7 @@ class Topology extends EventEmitter {
} else if (typeof selector === 'string') {
readPreference = new ReadPreference(selector);
} else {
ReadPreference.translate(options);
translateReadPreference(options);
readPreference = options.readPreference || ReadPreference.primary;
}
@@ -513,11 +505,6 @@ class Topology extends EventEmitter {
return;
}
// ignore this server update if its from an outdated topologyVersion
if (isStaleServerDescription(this.s.description, serverDescription)) {
return;
}
// these will be used for monitoring events later
const previousTopologyDescription = this.s.description;
const previousServerDescription = this.s.description.servers.get(serverDescription.address);
@@ -660,7 +647,7 @@ class Topology extends EventEmitter {
(callback = options), (options = {}), (options = options || {});
}
ReadPreference.translate(options);
translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
@@ -679,7 +666,7 @@ class Topology extends EventEmitter {
const cb = (err, result) => {
if (!err) return callback(null, result);
if (!shouldRetryOperation(err)) {
if (!isRetryableError(err)) {
return callback(err);
}
@@ -721,7 +708,7 @@ class Topology extends EventEmitter {
options = options || {};
const topology = options.topology || this;
const CursorClass = options.cursorFactory || this.s.Cursor;
ReadPreference.translate(options);
translateReadPreference(options);
return new CursorClass(topology, ns, cmd, options);
}
@@ -784,16 +771,6 @@ function isWriteCommand(command) {
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
}
function isStaleServerDescription(topologyDescription, incomingServerDescription) {
const currentServerDescription = topologyDescription.servers.get(
incomingServerDescription.address
);
const currentTopologyVersion = currentServerDescription.topologyVersion;
return (
compareTopologyVersion(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0
);
}
/**
* Destroys a server, and removes all event listeners from the instance
*
@@ -829,16 +806,10 @@ function parseStringSeedlist(seedlist) {
}
function topologyTypeFromSeedlist(seedlist, options) {
if (options.directConnection) {
return TopologyType.Single;
}
const replicaSet = options.replicaSet || options.setName || options.rs_name;
if (replicaSet == null) {
return TopologyType.Unknown;
}
return TopologyType.ReplicaSetNoPrimary;
if (seedlist.length === 1 && !replicaSet) return TopologyType.Single;
if (replicaSet) return TopologyType.ReplicaSetNoPrimary;
return TopologyType.Unknown;
}
function randomSelection(array) {
@@ -940,7 +911,7 @@ function executeWriteOperation(args, options, callback) {
const handler = (err, result) => {
if (!err) return callback(null, result);
if (!shouldRetryOperation(err)) {
if (!isRetryableError(err)) {
err = getMMAPError(err);
return callback(err);
}
@@ -968,8 +939,26 @@ function executeWriteOperation(args, options, callback) {
});
}
function shouldRetryOperation(err) {
return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError');
function translateReadPreference(options) {
if (options.readPreference == null) {
return;
}
let r = options.readPreference;
if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
});
}
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
}
return options;
}
function srvPollingHandler(topology) {

View File

@@ -132,10 +132,6 @@ class TopologyDescription {
let maxElectionId = this.maxElectionId;
let commonWireVersion = this.commonWireVersion;
if (serverDescription.setName && setName && serverDescription.setName !== setName) {
serverDescription = new ServerDescription(address, null);
}
const serverType = serverDescription.type;
let serverDescriptions = new Map(this.servers);
@@ -165,7 +161,7 @@ class TopologyDescription {
}
if (topologyType === TopologyType.Unknown) {
if (serverType === ServerType.Standalone && this.servers.size !== 1) {
if (serverType === ServerType.Standalone) {
serverDescriptions.delete(address);
} else {
topologyType = topologyTypeForServerType(serverType);
@@ -278,22 +274,8 @@ class TopologyDescription {
}
function topologyTypeForServerType(serverType) {
if (serverType === ServerType.Standalone) {
return TopologyType.Single;
}
if (serverType === ServerType.Mongos) {
return TopologyType.Sharded;
}
if (serverType === ServerType.RSPrimary) {
return TopologyType.ReplicaSetWithPrimary;
}
if (serverType === ServerType.RSGhost || serverType === ServerType.Unknown) {
return TopologyType.Unknown;
}
if (serverType === ServerType.Mongos) return TopologyType.Sharded;
if (serverType === ServerType.RSPrimary) return TopologyType.ReplicaSetWithPrimary;
return TopologyType.ReplicaSetNoPrimary;
}

View File

@@ -386,7 +386,10 @@ function attemptTransaction(session, startTime, fn, options) {
}
if (isMaxTimeMSExpiredError(err)) {
err.addErrorLabel('UnknownTransactionCommitResult');
if (err.errorLabels == null) {
err.errorLabels = [];
}
err.errorLabels.push('UnknownTransactionCommitResult');
}
throw err;
@@ -485,8 +488,17 @@ function endTransaction(session, commandName, callback) {
isRetryableError(e) ||
isMaxTimeMSExpiredError(e))
) {
if (e.errorLabels) {
const idx = e.errorLabels.indexOf('TransientTransactionError');
if (idx !== -1) {
e.errorLabels.splice(idx, 1);
}
} else {
e.errorLabels = [];
}
if (isUnknownTransactionCommitResult(e)) {
e.addErrorLabel('UnknownTransactionCommitResult');
e.errorLabels.push('UnknownTransactionCommitResult');
// per txns spec, must unpin session in this case
session.transaction.unpinServer();
@@ -673,12 +685,7 @@ function commandSupportsReadConcern(command, options) {
return true;
}
if (
command.mapReduce &&
options &&
options.out &&
(options.out.inline === 1 || options.out === 'inline')
) {
if (command.mapReduce && options.out && (options.out.inline === 1 || options.out === 'inline')) {
return true;
}
@@ -701,11 +708,6 @@ function applySession(session, command, options) {
return new MongoError('Cannot use a session that has ended');
}
// SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
if (options && options.writeConcern && options.writeConcern.w === 0) {
return;
}
const serverSession = session.serverSession;
serverSession.lastUse = now();
command.lsid = serverSession.id;
@@ -713,7 +715,7 @@ function applySession(session, command, options) {
// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options.willRetryWrite;
const shouldApplyReadConcern = commandSupportsReadConcern(command, options);
const shouldApplyReadConcern = commandSupportsReadConcern(command);
if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);

View File

@@ -13,10 +13,10 @@ const cloneOptions = require('./shared').cloneOptions;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const relayEvents = require('../utils').relayEvents;
const isRetryableError = require('../error').isRetryableError;
const BSON = retrieveBSON();
const getMMAPError = require('./shared').getMMAPError;
const makeClientMetadata = require('../utils').makeClientMetadata;
const legacyIsRetryableWriteError = require('./shared').legacyIsRetryableWriteError;
/**
* @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
@@ -71,7 +71,7 @@ var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
* @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
* @param {number} [options.size=5] Server connection pool size
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
* @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
* @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
@@ -113,18 +113,6 @@ var Mongos = function(seedlist, options) {
// Get replSet Id
this.id = id++;
// deduplicate seedlist
if (Array.isArray(seedlist)) {
seedlist = seedlist.reduce((seeds, seed) => {
if (seeds.find(s => s.host === seed.host && s.port === seed.port)) {
return seeds;
}
seeds.push(seed);
return seeds;
}, []);
}
// Internal state
this.s = {
options: Object.assign({ metadata: makeClientMetadata(options) }, options),
@@ -923,7 +911,7 @@ function executeWriteOperation(args, options, callback) {
const handler = (err, result) => {
if (!err) return callback(null, result);
if (!legacyIsRetryableWriteError(err, self) || !willRetryWrite) {
if (!isRetryableError(err) || !willRetryWrite) {
err = getMMAPError(err);
return callback(err);
}
@@ -1119,7 +1107,7 @@ Mongos.prototype.command = function(ns, cmd, options, callback) {
const cb = (err, result) => {
if (!err) return callback(null, result);
if (!legacyIsRetryableWriteError(err, self)) {
if (!isRetryableError(err)) {
return callback(err);
}
@@ -1133,8 +1121,8 @@ Mongos.prototype.command = function(ns, cmd, options, callback) {
// increment and assign txnNumber
if (willRetryWrite) {
clonedOptions.session.incrementTransactionNumber();
clonedOptions.willRetryWrite = willRetryWrite;
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}
// Execute the command

View File

@@ -8,8 +8,6 @@
* @param {array} tags The tags object
* @param {object} [options] Additional read preference options
* @param {number} [options.maxStalenessSeconds] Max secondary read staleness in seconds, Minimum value is 90 seconds.
* @param {object} [options.hedge] Server mode in which the same query is dispatched in parallel to multiple replica set members.
* @param {boolean} [options.hedge.enabled] Explicitly enable or disable hedged reads.
* @see https://docs.mongodb.com/manual/core/read-preference/
* @return {ReadPreference}
*/
@@ -24,10 +22,7 @@ const ReadPreference = function(mode, tags, options) {
'ReadPreference tags must be an array, this will change in the next major version'
);
const tagsHasMaxStalenessSeconds = typeof tags.maxStalenessSeconds !== 'undefined';
const tagsHasHedge = typeof tags.hedge !== 'undefined';
const tagsHasOptions = tagsHasMaxStalenessSeconds || tagsHasHedge;
if (tagsHasOptions) {
if (typeof tags.maxStalenessSeconds !== 'undefined') {
// this is likely an options object
options = tags;
tags = undefined;
@@ -38,7 +33,6 @@ const ReadPreference = function(mode, tags, options) {
this.mode = mode;
this.tags = tags;
this.hedge = options && options.hedge;
options = options || {};
if (options.maxStalenessSeconds != null) {
@@ -61,10 +55,6 @@ const ReadPreference = function(mode, tags, options) {
if (this.maxStalenessSeconds) {
throw new TypeError('Primary read preference cannot be combined with maxStalenessSeconds');
}
if (this.hedge) {
throw new TypeError('Primary read preference cannot be combined with hedge');
}
}
};
@@ -101,19 +91,20 @@ const VALID_MODES = [
* @return {ReadPreference}
*/
ReadPreference.fromOptions = function(options) {
if (!options) return null;
const readPreference = options.readPreference;
if (!readPreference) return null;
const readPreferenceTags = options.readPreferenceTags;
const maxStalenessSeconds = options.maxStalenessSeconds;
if (readPreference == null) {
return null;
}
if (typeof readPreference === 'string') {
return new ReadPreference(readPreference, readPreferenceTags);
} else if (!(readPreference instanceof ReadPreference) && typeof readPreference === 'object') {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
return new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds || maxStalenessSeconds,
hedge: readPreference.hedge
maxStalenessSeconds: readPreference.maxStalenessSeconds
});
}
}
@@ -121,60 +112,6 @@ ReadPreference.fromOptions = function(options) {
return readPreference;
};
/**
* Resolves a read preference based on well-defined inheritance rules. This method will not only
* determine the read preference (if there is one), but will also ensure the returned value is a
* properly constructed instance of `ReadPreference`.
*
* @param {Collection|Db|MongoClient} parent The parent of the operation on which to determine the read
* preference, used for determining the inherited read preference.
* @param {object} options The options passed into the method, potentially containing a read preference
* @returns {(ReadPreference|null)} The resolved read preference
*/
ReadPreference.resolve = function(parent, options) {
options = options || {};
const session = options.session;
const inheritedReadPreference = parent && parent.readPreference;
let readPreference;
if (options.readPreference) {
readPreference = ReadPreference.fromOptions(options);
} else if (session && session.inTransaction() && session.transaction.options.readPreference) {
// The transactions read preference MUST override all other user configurable read preferences.
readPreference = session.transaction.options.readPreference;
} else if (inheritedReadPreference != null) {
readPreference = inheritedReadPreference;
} else {
readPreference = ReadPreference.primary;
}
return typeof readPreference === 'string' ? new ReadPreference(readPreference) : readPreference;
};
/**
* Replaces options.readPreference with a ReadPreference instance
*/
ReadPreference.translate = function(options) {
if (options.readPreference == null) return options;
const r = options.readPreference;
if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
});
}
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
}
return options;
};
/**
* Validate if a mode is legal
*
@@ -228,7 +165,6 @@ ReadPreference.prototype.toJSON = function() {
const readPreference = { mode: this.mode };
if (Array.isArray(this.tags)) readPreference.tags = this.tags;
if (this.maxStalenessSeconds) readPreference.maxStalenessSeconds = this.maxStalenessSeconds;
if (this.hedge) readPreference.hedge = this.hedge;
return readPreference;
};

View File

@@ -15,10 +15,10 @@ const Interval = require('./shared').Interval;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const relayEvents = require('../utils').relayEvents;
const isRetryableError = require('../error').isRetryableError;
const BSON = retrieveBSON();
const getMMAPError = require('./shared').getMMAPError;
const makeClientMetadata = require('../utils').makeClientMetadata;
const legacyIsRetryableWriteError = require('./shared').legacyIsRetryableWriteError;
const now = require('../../utils').now;
const calculateDurationInMs = require('../../utils').calculateDurationInMs;
@@ -72,7 +72,7 @@ var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
* @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
* @param {number} [options.size=5] Server connection pool size
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
* @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=0] TCP Socket timeout setting
@@ -1202,7 +1202,7 @@ function executeWriteOperation(args, options, callback) {
const handler = (err, result) => {
if (!err) return callback(null, result);
if (!legacyIsRetryableWriteError(err, self)) {
if (!isRetryableError(err)) {
err = getMMAPError(err);
return callback(err);
}
@@ -1365,7 +1365,7 @@ ReplSet.prototype.command = function(ns, cmd, options, callback) {
const cb = (err, result) => {
if (!err) return callback(null, result);
if (!legacyIsRetryableWriteError(err, self)) {
if (!isRetryableError(err)) {
return callback(err);
}

View File

@@ -72,7 +72,7 @@ function topologyId(server) {
* @param {number} options.port The server port
* @param {number} [options.size=5] Server connection pool size
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting

View File

@@ -2,9 +2,7 @@
const ReadPreference = require('./read_preference');
const TopologyType = require('../sdam/common').TopologyType;
const MongoError = require('../error').MongoError;
const isRetryableWriteError = require('../error').isRetryableWriteError;
const maxWireVersion = require('../utils').maxWireVersion;
const MongoNetworkError = require('../error').MongoNetworkError;
const MMAPv1_RETRY_WRITES_ERROR_CODE = 20;
/**
@@ -418,39 +416,18 @@ function getMMAPError(err) {
return newErr;
}
// NOTE: only used for legacy topology types
function legacyIsRetryableWriteError(err, topology) {
if (!(err instanceof MongoError)) {
return false;
}
// if pre-4.4 server, then add error label if its a retryable write error
if (
isRetryableWritesSupported(topology) &&
(err instanceof MongoNetworkError ||
(maxWireVersion(topology) < 9 && isRetryableWriteError(err)))
) {
err.addErrorLabel('RetryableWriteError');
}
return err.hasErrorLabel('RetryableWriteError');
}
module.exports = {
SessionMixins,
resolveClusterTime,
inquireServerState,
getTopologyType,
emitServerDescriptionChanged,
emitTopologyDescriptionChanged,
cloneOptions,
createCompressionInfo,
clone,
diff,
Interval,
Timeout,
isRetryableWritesSupported,
getMMAPError,
topologyType,
legacyIsRetryableWriteError
};
module.exports.SessionMixins = SessionMixins;
module.exports.resolveClusterTime = resolveClusterTime;
module.exports.inquireServerState = inquireServerState;
module.exports.getTopologyType = getTopologyType;
module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged;
module.exports.emitTopologyDescriptionChanged = emitTopologyDescriptionChanged;
module.exports.cloneOptions = cloneOptions;
module.exports.createCompressionInfo = createCompressionInfo;
module.exports.clone = clone;
module.exports.diff = diff;
module.exports.Interval = Interval;
module.exports.Timeout = Timeout;
module.exports.isRetryableWritesSupported = isRetryableWritesSupported;
module.exports.getMMAPError = getMMAPError;
module.exports.topologyType = topologyType;

View File

@@ -37,10 +37,6 @@ function matchesParentDomain(srvAddress, parentDomain) {
function parseSrvConnectionString(uri, options, callback) {
const result = URL.parse(uri, true);
if (options.directConnection || options.directconnection) {
return callback(new MongoParseError('directConnection not supported with SRV URI'));
}
if (result.hostname.split('.').length < 3) {
return callback(new MongoParseError('URI does not have hostname, domain name and tld'));
}
@@ -175,7 +171,6 @@ const STRING_OPTIONS = new Set(['authsource', 'replicaset']);
// NOTE: this list exists in native already, if it is merged here we should deduplicate
const AUTH_MECHANISMS = new Set([
'GSSAPI',
'MONGODB-AWS',
'MONGODB-X509',
'MONGODB-CR',
'DEFAULT',
@@ -219,8 +214,7 @@ const CASE_TRANSLATION = {
tlscertificatekeyfile: 'tlsCertificateKeyFile',
tlscertificatekeyfilepassword: 'tlsCertificateKeyFilePassword',
wtimeout: 'wTimeoutMS',
j: 'journal',
directconnection: 'directConnection'
j: 'journal'
};
/**
@@ -263,9 +257,7 @@ function applyConnectionStringOption(obj, key, value, options) {
if (key === 'authmechanism' && !AUTH_MECHANISMS.has(value)) {
throw new MongoParseError(
`Value for authMechanism must be one of: ${Array.from(AUTH_MECHANISMS).join(
', '
)}, found: ${value}`
'Value for `authMechanism` must be one of: `DEFAULT`, `GSSAPI`, `PLAIN`, `MONGODB-X509`, `SCRAM-SHA-1`, `SCRAM-SHA-256`'
);
}
@@ -365,16 +357,6 @@ function applyAuthExpectations(parsed) {
parsed.auth = Object.assign({}, parsed.auth, { db: '$external' });
}
if (authMechanism === 'MONGODB-AWS') {
if (authSource != null && authSource !== '$external') {
throw new MongoParseError(
`Invalid source \`${authSource}\` for mechanism \`${authMechanism}\` specified.`
);
}
parsed.auth = Object.assign({}, parsed.auth, { db: '$external' });
}
if (authMechanism === 'MONGODB-X509') {
if (parsed.auth && parsed.auth.password != null) {
throw new MongoParseError(`Password not allowed for mechanism \`${authMechanism}\``);
@@ -570,6 +552,10 @@ function parseConnectionString(uri, options, callback) {
return callback(new MongoParseError('Invalid protocol provided'));
}
if (protocol === PROTOCOL_MONGODB_SRV) {
return parseSrvConnectionString(uri, options, callback);
}
const dbAndQuery = cap[4].split('?');
const db = dbAndQuery.length > 0 ? dbAndQuery[0] : null;
const query = dbAndQuery.length > 1 ? dbAndQuery[1] : null;
@@ -582,11 +568,6 @@ function parseConnectionString(uri, options, callback) {
}
parsedOptions = Object.assign({}, parsedOptions, options);
if (protocol === PROTOCOL_MONGODB_SRV) {
return parseSrvConnectionString(uri, parsedOptions, callback);
}
const auth = { username: null, password: null, db: db && db !== '' ? qs.unescape(db) : null };
if (parsedOptions.auth) {
// maintain support for legacy options passed into `MongoClient`
@@ -680,22 +661,6 @@ function parseConnectionString(uri, options, callback) {
return callback(new MongoParseError('No hostname or hostnames provided in connection string'));
}
const directConnection = !!parsedOptions.directConnection;
if (directConnection && hosts.length !== 1) {
// If the option is set to true, the driver MUST validate that there is exactly one host given
// in the host list in the URI, and fail client creation otherwise.
return callback(new MongoParseError('directConnection option requires exactly one host'));
}
// NOTE: this behavior will go away in v4.0, we will always auto discover there
if (
parsedOptions.directConnection == null &&
hosts.length === 1 &&
parsedOptions.replicaSet == null
) {
parsedOptions.directConnection = true;
}
const result = {
hosts: hosts,
auth: auth.db || auth.username ? auth : null,

View File

@@ -149,35 +149,6 @@ function eachAsync(arr, eachFn, callback) {
}
}
function eachAsyncSeries(arr, eachFn, callback) {
arr = arr || [];
let idx = 0;
let awaiting = arr.length;
if (awaiting === 0) {
callback();
return;
}
function eachCallback(err) {
idx++;
awaiting--;
if (err) {
callback(err);
return;
}
if (idx === arr.length && awaiting <= 0) {
callback();
return;
}
eachFn(arr[idx], eachCallback);
}
eachFn(arr[idx], eachCallback);
}
function isUnifiedTopology(topology) {
return topology.description != null;
}
@@ -286,7 +257,6 @@ module.exports = {
maxWireVersion,
isPromiseLike,
eachAsync,
eachAsyncSeries,
isUnifiedTopology,
arrayStrictEqual,
tagsStrictEqual,

View File

@@ -66,7 +66,12 @@ function _command(server, ns, cmd, options, callback) {
finalCmd.$clusterTime = clusterTime;
}
if (isSharded(server) && !shouldUseOpMsg && readPreference && readPreference.mode !== 'primary') {
if (
isSharded(server) &&
!shouldUseOpMsg &&
readPreference &&
readPreference.preference !== 'primary'
) {
finalCmd = {
$query: finalCmd,
$readPreference: readPreference.toJSON()
@@ -100,7 +105,10 @@ function _command(server, ns, cmd, options, callback) {
err instanceof MongoNetworkError &&
!err.hasErrorLabel('TransientTransactionError')
) {
err.addErrorLabel('TransientTransactionError');
if (err.errorLabels == null) {
err.errorLabels = [];
}
err.errorLabels.push('TransientTransactionError');
}
if (

View File

@@ -1,9 +1,9 @@
'use strict';
const MIN_SUPPORTED_SERVER_VERSION = '2.6';
const MAX_SUPPORTED_SERVER_VERSION = '4.4';
const MAX_SUPPORTED_SERVER_VERSION = '4.2';
const MIN_SUPPORTED_WIRE_VERSION = 2;
const MAX_SUPPORTED_WIRE_VERSION = 9;
const MAX_SUPPORTED_WIRE_VERSION = 8;
module.exports = {
MIN_SUPPORTED_SERVER_VERSION,

View File

@@ -100,10 +100,6 @@ function prepareFindCommand(server, ns, cmd, cursorState) {
sortValue = sortObject;
}
if (typeof cmd.allowDiskUse === 'boolean') {
findCmd.allowDiskUse = cmd.allowDiskUse;
}
if (cmd.sort) findCmd.sort = sortValue;
if (cmd.fields) findCmd.projection = cmd.fields;
if (cmd.hint) findCmd.hint = cmd.hint;