This commit is contained in:
Jonasz Bigda
2023-03-25 21:51:42 +01:00
parent 0db1d5117e
commit b332e9ceb0
1044 changed files with 37502 additions and 63938 deletions

View File

@@ -4,6 +4,7 @@ const EventEmitter = require('events');
const MessageStream = require('./message_stream');
const MongoError = require('../core/error').MongoError;
const MongoNetworkError = require('../core/error').MongoNetworkError;
const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError;
const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
const CommandResult = require('../core/connection/command_result');
const StreamDescription = require('./stream_description').StreamDescription;
@@ -31,9 +32,13 @@ class Connection extends EventEmitter {
this.id = options.id;
this.address = streamIdentifier(stream);
this.bson = options.bson;
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
this.host = options.host || 'localhost';
this.port = options.port || 27017;
this.monitorCommands =
typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
@@ -55,34 +60,9 @@ class Connection extends EventEmitter {
/* ignore errors, listen to `close` instead */
});
stream.on('close', () => {
if (this.closed) {
return;
}
this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
);
this[kQueue].clear();
this.emit('close');
});
stream.on('timeout', () => {
if (this.closed) {
return;
}
stream.destroy();
this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} timed out`))
);
this[kQueue].clear();
this.emit('close');
});
this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));
// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
@@ -125,6 +105,39 @@ class Connection extends EventEmitter {
this[kLastUseTime] = now();
}
/**
* @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue
*/
handleIssue(issue) {
if (this.closed) {
return;
}
if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
this.closed = true;
for (const idAndOp of this[kQueue]) {
const op = idAndOp[1];
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this.ismaster == null
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
}
this[kQueue].clear();
this.emit('close');
}
destroy(options, callback) {
if (typeof options === 'function') {
callback = options;
@@ -159,33 +172,58 @@ class Connection extends EventEmitter {
});
}
applyApiVersion(options) {
if (this.serverApi) {
options.serverApi = this.serverApi;
}
return options;
}
// Wire protocol methods
command(ns, cmd, options, callback) {
wp.command(makeServerTrampoline(this), ns, cmd, options, callback);
if (typeof options === 'function') {
callback = options;
options = {};
}
wp.command(makeServerTrampoline(this), ns, cmd, this.applyApiVersion(options), callback);
}
query(ns, cmd, cursorState, options, callback) {
wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
wp.query(
makeServerTrampoline(this),
ns,
cmd,
cursorState,
this.applyApiVersion(options),
callback
);
}
getMore(ns, cursorState, batchSize, options, callback) {
wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
wp.getMore(
makeServerTrampoline(this),
ns,
cursorState,
batchSize,
this.applyApiVersion(options),
callback
);
}
killCursors(ns, cursorState, callback) {
wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
wp.killCursors(makeServerTrampoline(this), ns, cursorState, this.applyApiVersion({}), callback);
}
insert(ns, ops, options, callback) {
wp.insert(makeServerTrampoline(this), ns, ops, options, callback);
wp.insert(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
}
update(ns, ops, options, callback) {
wp.update(makeServerTrampoline(this), ns, ops, options, callback);
wp.update(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
}
remove(ns, ops, options, callback) {
wp.remove(makeServerTrampoline(this), ns, ops, options, callback);
wp.remove(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
}
}
@@ -218,6 +256,7 @@ function messageHandler(conn) {
}
const operationDescription = conn[kQueue].get(message.responseTo);
const callback = operationDescription.cb;
// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
@@ -226,10 +265,7 @@ function messageHandler(conn) {
if (message.moreToCome) {
// requeue the callback for next synthetic request
conn[kQueue].set(message.requestId, operationDescription);
}
const callback = operationDescription.cb;
if (operationDescription.socketTimeoutOverride) {
} else if (operationDescription.socketTimeoutOverride) {
conn[kStream].setTimeout(conn.socketTimeout);
}
@@ -308,6 +344,7 @@ function write(command, options, callback) {
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false,
raw: typeof options.raw === 'boolean' ? options.raw : false
};