/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */
'use strict';
const EventEmitter = require(
'events');
const https = require(
'https');
const http = require(
'http');
const net = require(
'net');
const tls = require(
'tls');
const { randomBytes, createHash } = require(
'crypto');
const { Readable } = require(
'stream');
const { URL } = require(
'url');
const PerMessageDeflate = require(
'./permessage-deflate');
const Receiver = require(
'./receiver');
const Sender = require(
'./sender');
const {
BINARY_TYPES,
EMPTY_BUFFER,
GUID,
kForOnEventAttribute,
kListener,
kStatusCode,
kWebSocket,
NOOP
} = require(
'./constants');
const {
EventTarget: { addEventListener, removeEventListener }
} = require(
'./event-target');
const { format, parse } = require(
'./extension');
const { toBuffer } = require(
'./buffer-util');
const closeTimeout = 30 * 1000;
const kAborted = Symbol(
'kAborted');
const protocolVersions = [8, 13];
const readyStates = [
'CONNECTING',
'OPEN',
'CLOSING',
'CLOSED'];
const subprotocolRegex = /^[!#$%&
'*+\-.0-9A-Z^_`|a-z~]+$/;
/**
* Class representing a WebSocket.
*
* @extends EventEmitter
*/
class WebSocket
extends EventEmitter {
/**
* Create a new `WebSocket`.
*
* @param {(String|URL)} address The URL to which to connect
* @param {(String|String[])} [protocols] The subprotocols
* @param {Object} [options] Connection options
*/
constructor(address, protocols, options) {
super();
this._binaryType = BINARY_TYPES[0];
this._closeCode = 1006;
this._closeFrameReceived =
false;
this._closeFrameSent =
false;
this._closeMessage = EMPTY_BUFFER;
this._closeTimer =
null;
this._extensions = {};
this._paused =
false;
this._protocol =
'';
this._readyState = WebSocket.CONNECTING;
this._receiver =
null;
this._sender =
null;
this._socket =
null;
if (address !==
null) {
this._bufferedAmount = 0;
this._isServer =
false;
this._redirects = 0;
if (protocols === undefined) {
protocols = [];
}
else if (!Array.isArray(protocols)) {
if (
typeof protocols ===
'object' && protocols !==
null) {
options = protocols;
protocols = [];
}
else {
protocols = [protocols];
}
}
initAsClient(
this, address, protocols, options);
}
else {
this._isServer =
true;
}
}
/**
* This deviates from the WHATWG interface since ws doesn't support the
* required default "blob" type (instead we define a custom "nodebuffer"
* type).
*
* @type {String}
*/
get binaryType() {
return this._binaryType;
}
set binaryType(type) {
if (!BINARY_TYPES.includes(type))
return;
this._binaryType = type;
//
// Allow to change `binaryType` on the fly.
//
if (
this._receiver)
this._receiver._binaryType = type;
}
/**
* @type {Number}
*/
get bufferedAmount() {
if (!
this._socket)
return this._bufferedAmount;
return this._socket._writableState.length +
this._sender._bufferedBytes;
}
/**
* @type {String}
*/
get extensions() {
return Object.keys(
this._extensions).join();
}
/**
* @type {Boolean}
*/
get isPaused() {
return this._paused;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get onclose() {
return null;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get onerror() {
return null;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get onopen() {
return null;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get onmessage() {
return null;
}
/**
* @type {String}
*/
get protocol() {
return this._protocol;
}
/**
* @type {Number}
*/
get readyState() {
return this._readyState;
}
/**
* @type {String}
*/
get url() {
return this._url;
}
/**
* Set up the socket and the internal resources.
*
* @param {(net.Socket|tls.Socket)} socket The network socket between the
* server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Object} options Options object
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Number} [options.maxPayload=0] The maximum allowed message size
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
* not to skip UTF-8 validation for text and close messages
* @private
*/
setSocket(socket, head, options) {
const receiver =
new Receiver({
binaryType:
this.binaryType,
extensions:
this._extensions,
isServer:
this._isServer,
maxPayload: options.maxPayload,
skipUTF8Validation: options.skipUTF8Validation
});
this._sender =
new Sender(socket,
this._extensions, options.generateMask);
this._receiver = receiver;
this._socket = socket;
receiver[kWebSocket] =
this;
socket[kWebSocket] =
this;
receiver.on(
'conclude', receiverOnConclude);
receiver.on(
'drain', receiverOnDrain);
receiver.on(
'error', receiverOnError);
receiver.on(
'message', receiverOnMessage);
receiver.on(
'ping', receiverOnPing);
receiver.on(
'pong', receiverOnPong);
socket.setTimeout(0);
socket.setNoDelay();
if (head.length > 0) socket.unshift(head);
socket.on(
'close', socketOnClose);
socket.on(
'data', socketOnData);
socket.on(
'end', socketOnEnd);
socket.on(
'error', socketOnError);
this._readyState = WebSocket.OPEN;
this.emit(
'open');
}
/**
* Emit the `'close'` event.
*
* @private
*/
emitClose() {
if (!
this._socket) {
this._readyState = WebSocket.CLOSED;
this.emit(
'close',
this._closeCode,
this._closeMessage);
return;
}
if (
this._extensions[PerMessageDeflate.extensionName]) {
this._extensions[PerMessageDeflate.extensionName].cleanup();
}
this._receiver.removeAllListeners();
this._readyState = WebSocket.CLOSED;
this.emit(
'close',
this._closeCode,
this._closeMessage);
}
/**
* Start a closing handshake.
*
* +----------+ +-----------+ +----------+
* - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
* | +----------+ +-----------+ +----------+ |
* +----------+ +-----------+ |
* CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
* +----------+ +-----------+ |
* | | | +---+ |
* +------------------------+-->|fin| - - - -
* | +---+ | +---+
* - - - - -|fin|<---------------------+
* +---+
*
* @param {Number} [code] Status code explaining why the connection is closing
* @param {(String|Buffer)} [data] The reason why the connection is
* closing
* @public
*/
close(code, data) {
if (
this.readyState === WebSocket.CLOSED)
return;
if (
this.readyState === WebSocket.CONNECTING) {
const msg =
'WebSocket was closed before the connection was established';
return abortHandshake(
this,
this._req, msg);
}
if (
this.readyState === WebSocket.CLOSING) {
if (
this._closeFrameSent &&
(
this._closeFrameReceived ||
this._receiver._writableState.errorEmitted)
) {
this._socket.end();
}
return;
}
this._readyState = WebSocket.CLOSING;
this._sender.close(code, data, !
this._isServer, (err) => {
//
// This error is handled by the `'error'` listener on the socket. We only
// want to know if the close frame has been sent here.
//
if (err)
return;
this._closeFrameSent =
true;
if (
this._closeFrameReceived ||
this._receiver._writableState.errorEmitted
) {
this._socket.end();
}
});
//
// Specify a timeout for the closing handshake to complete.
//
this._closeTimer = setTimeout(
this._socket.destroy.bind(
this._socket),
closeTimeout
);
}
/**
* Pause the socket.
*
* @public
*/
pause() {
if (
this.readyState === WebSocket.CONNECTING ||
this.readyState === WebSocket.CLOSED
) {
return;
}
this._paused =
true;
this._socket.pause();
}
/**
* Send a ping.
*
* @param {*} [data] The data to send
* @param {Boolean} [mask] Indicates whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when the ping is sent
* @public
*/
ping(data, mask, cb) {
if (
this.readyState === WebSocket.CONNECTING) {
throw new Error(
'WebSocket is not open: readyState 0 (CONNECTING)');
}
if (
typeof data ===
'function') {
cb = data;
data = mask = undefined;
}
else if (
typeof mask ===
'function') {
cb = mask;
mask = undefined;
}
if (
typeof data ===
'number') data = data.toString();
if (
this.readyState !== WebSocket.OPEN) {
sendAfterClose(
this, data, cb);
return;
}
if (mask === undefined) mask = !
this._isServer;
this._sender.ping(data || EMPTY_BUFFER, mask, cb);
}
/**
* Send a pong.
*
* @param {*} [data] The data to send
* @param {Boolean} [mask] Indicates whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when the pong is sent
* @public
*/
pong(data, mask, cb) {
if (
this.readyState === WebSocket.CONNECTING) {
throw new Error(
'WebSocket is not open: readyState 0 (CONNECTING)');
}
if (
typeof data ===
'function') {
cb = data;
data = mask = undefined;
}
else if (
typeof mask ===
'function') {
cb = mask;
mask = undefined;
}
if (
typeof data ===
'number') data = data.toString();
if (
this.readyState !== WebSocket.OPEN) {
sendAfterClose(
this, data, cb);
return;
}
if (mask === undefined) mask = !
this._isServer;
this._sender.pong(data || EMPTY_BUFFER, mask, cb);
}
/**
* Resume the socket.
*
* @public
*/
resume() {
if (
this.readyState === WebSocket.CONNECTING ||
this.readyState === WebSocket.CLOSED
) {
return;
}
this._paused =
false;
if (!
this._receiver._writableState.needDrain)
this._socket.resume();
}
/**
* Send a data message.
*
* @param {*} data The message to send
* @param {Object} [options] Options object
* @param {Boolean} [options.binary] Specifies whether `data` is binary or
* text
* @param {Boolean} [options.compress] Specifies whether or not to compress
* `data`
* @param {Boolean} [options.fin=true] Specifies whether the fragment is the
* last one
* @param {Boolean} [options.mask] Specifies whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when data is written out
* @public
*/
send(data, options, cb) {
if (
this.readyState === WebSocket.CONNECTING) {
throw new Error(
'WebSocket is not open: readyState 0 (CONNECTING)');
}
if (
typeof options ===
'function') {
cb = options;
options = {};
}
if (
typeof data ===
'number') data = data.toString();
if (
this.readyState !== WebSocket.OPEN) {
sendAfterClose(
this, data, cb);
return;
}
const opts = {
binary:
typeof data !==
'string',
mask: !
this._isServer,
compress:
true,
fin:
true,
...options
};
if (!
this._extensions[PerMessageDeflate.extensionName]) {
opts.compress =
false;
}
this._sender.send(data || EMPTY_BUFFER, opts, cb);
}
/**
* Forcibly close the connection.
*
* @public
*/
terminate() {
if (
this.readyState === WebSocket.CLOSED)
return;
if (
this.readyState === WebSocket.CONNECTING) {
const msg =
'WebSocket was closed before the connection was established';
return abortHandshake(
this,
this._req, msg);
}
if (
this._socket) {
this._readyState = WebSocket.CLOSING;
this._socket.destroy();
}
}
}
/**
* @constant {Number} CONNECTING
* @memberof WebSocket
*/
Object.defineProperty(WebSocket,
'CONNECTING', {
enumerable:
true,
value: readyStates.indexOf(
'CONNECTING')
});
/**
* @constant {Number} CONNECTING
* @memberof WebSocket.prototype
*/
Object.defineProperty(WebSocket.prototype,
'CONNECTING', {
enumerable:
true,
value: readyStates.indexOf(
'CONNECTING')
});
/**
* @constant {Number} OPEN
* @memberof WebSocket
*/
Object.defineProperty(WebSocket,
'OPEN', {
enumerable:
true,
value: readyStates.indexOf(
'OPEN')
});
/**
* @constant {Number} OPEN
* @memberof WebSocket.prototype
*/
Object.defineProperty(WebSocket.prototype,
'OPEN', {
enumerable:
true,
value: readyStates.indexOf(
'OPEN')
});
/**
* @constant {Number} CLOSING
* @memberof WebSocket
*/
Object.defineProperty(WebSocket,
'CLOSING', {
enumerable:
true,
value: readyStates.indexOf(
'CLOSING')
});
/**
* @constant {Number} CLOSING
* @memberof WebSocket.prototype
*/
Object.defineProperty(WebSocket.prototype,
'CLOSING', {
enumerable:
true,
value: readyStates.indexOf(
'CLOSING')
});
/**
* @constant {Number} CLOSED
* @memberof WebSocket
*/
Object.defineProperty(WebSocket,
'CLOSED', {
enumerable:
true,
value: readyStates.indexOf(
'CLOSED')
});
/**
* @constant {Number} CLOSED
* @memberof WebSocket.prototype
*/
Object.defineProperty(WebSocket.prototype,
'CLOSED', {
enumerable:
true,
value: readyStates.indexOf(
'CLOSED')
});
[
'binaryType',
'bufferedAmount',
'extensions',
'isPaused',
'protocol',
'readyState',
'url'
].forEach((property) => {
Object.defineProperty(WebSocket.prototype, property, { enumerable:
true });
});
//
// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
//
[
'open',
'error',
'close',
'message'].forEach((method) => {
Object.defineProperty(WebSocket.prototype, `on${method}`, {
enumerable:
true,
get() {
for (
const listener of
this.listeners(method)) {
if (listener[kForOnEventAttribute])
return listener[kListener];
}
return null;
},
set(handler) {
for (
const listener of
this.listeners(method)) {
if (listener[kForOnEventAttribute]) {
this.removeListener(method, listener);
break;
}
}
if (
typeof handler !==
'function')
return;
this.addEventListener(method, handler, {
[kForOnEventAttribute]:
true
});
}
});
});
WebSocket.prototype.addEventListener = addEventListener;
WebSocket.prototype.removeEventListener = removeEventListener;
module.exports = WebSocket;
/**
* Initialize a WebSocket client.
*
* @param {WebSocket} websocket The client to initialize
* @param {(String|URL)} address The URL to which to connect
* @param {Array} protocols The subprotocols
* @param {Object} [options] Connection options
* @param {Boolean} [options.followRedirects=false] Whether or not to follow
* redirects
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the
* handshake request
* @param {Number} [options.maxPayload=104857600] The maximum allowed message
* size
* @param {Number} [options.maxRedirects=10] The maximum number of redirects
* allowed
* @param {String} [options.origin] Value of the `Origin` or
* `Sec-WebSocket-Origin` header
* @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable
* permessage-deflate
* @param {Number} [options.protocolVersion=13] Value of the
* `Sec-WebSocket-Version` header
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
* not to skip UTF-8 validation for text and close messages
* @private
*/
function initAsClient(websocket, address, protocols, options) {
const opts = {
protocolVersion: protocolVersions[1],
maxPayload: 100 * 1024 * 1024,
skipUTF8Validation:
false,
perMessageDeflate:
true,
followRedirects:
false,
maxRedirects: 10,
...options,
createConnection: undefined,
socketPath: undefined,
hostname: undefined,
protocol: undefined,
timeout: undefined,
method:
'GET',
host: undefined,
path: undefined,
port: undefined
};
if (!protocolVersions.includes(opts.protocolVersion)) {
throw new RangeError(
`Unsupported protocol version: ${opts.protocolVersion} ` +
`(supported versions: ${protocolVersions.join(
', ')})`
);
}
let parsedUrl;
if (address
instanceof URL) {
parsedUrl = address;
websocket._url = address.href;
}
else {
try {
parsedUrl =
new URL(address);
}
catch (e) {
throw new SyntaxError(`Invalid URL: ${address}`);
}
websocket._url = address;
}
const isSecure = parsedUrl.protocol ===
'wss:';
const isUnixSocket = parsedUrl.protocol ===
'ws+unix:';
let invalidURLMessage;
if (parsedUrl.protocol !==
'ws:' && !isSecure && !isUnixSocket) {
invalidURLMessage =
'The URL\'s protocol must be one of
"ws:",
"wss:", or
"ws+unix:"';
}
else if (isUnixSocket && !parsedUrl.pathname) {
invalidURLMessage =
"The URL's pathname is empty";
}
else if (parsedUrl.hash) {
invalidURLMessage =
'The URL contains a fragment identifier';
}
if (invalidURLMessage) {
const err =
new SyntaxError(invalidURLMessage);
if (websocket._redirects === 0) {
throw err;
}
else {
emitErrorAndClose(websocket, err);
return;
}
}
const defaultPort = isSecure ? 443 : 80;
const key = randomBytes(16).toString(
'base64');
const request = isSecure ? https.request : http.request;
const protocolSet =
new Set();
let perMessageDeflate;
opts.createConnection = isSecure ? tlsConnect : netConnect;
opts.defaultPort = opts.defaultPort || defaultPort;
opts.port = parsedUrl.port || defaultPort;
opts.host = parsedUrl.hostname.startsWith(
'[')
? parsedUrl.hostname.slice(1, -1)
: parsedUrl.hostname;
opts.headers = {
...opts.headers,
'Sec-WebSocket-Version': opts.protocolVersion,
'Sec-WebSocket-Key': key,
Connection:
'Upgrade',
Upgrade:
'websocket'
};
opts.path = parsedUrl.pathname + parsedUrl.search;
opts.timeout = opts.handshakeTimeout;
if (opts.perMessageDeflate) {
perMessageDeflate =
new PerMessageDeflate(
opts.perMessageDeflate !==
true ? opts.perMessageDeflate : {},
false,
opts.maxPayload
);
opts.headers[
'Sec-WebSocket-Extensions'] = format({
[PerMessageDeflate.extensionName]: perMessageDeflate.offer()
});
}
if (protocols.length) {
for (
const protocol of protocols) {
if (
typeof protocol !==
'string' ||
!subprotocolRegex.test(protocol) ||
protocolSet.has(protocol)
) {
throw new SyntaxError(
'An invalid or duplicated subprotocol was specified'
);
}
protocolSet.add(protocol);
}
opts.headers[
'Sec-WebSocket-Protocol'] = protocols.join(
',');
}
if (opts.origin) {
if (opts.protocolVersion < 13) {
opts.headers[
'Sec-WebSocket-Origin'] = opts.origin;
}
else {
opts.headers.Origin = opts.origin;
}
}
if (parsedUrl.username || parsedUrl.password) {
opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
}
if (isUnixSocket) {
const parts = opts.path.split(
':');
opts.socketPath = parts[0];
opts.path = parts[1];
}
let req;
if (opts.followRedirects) {
if (websocket._redirects === 0) {
websocket._originalUnixSocket = isUnixSocket;
websocket._originalSecure = isSecure;
websocket._originalHostOrSocketPath = isUnixSocket
? opts.socketPath
: parsedUrl.host;
const headers = options && options.headers;
//
// Shallow copy the user provided options so that headers can be changed
// without mutating the original object.
//
options = { ...options, headers: {} };
if (headers) {
for (
const [key, value] of Object.entries(headers)) {
options.headers[key.toLowerCase()] = value;
}
}
}
else if (websocket.listenerCount(
'redirect') === 0) {
const isSameHost = isUnixSocket
? websocket._originalUnixSocket
? opts.socketPath === websocket._originalHostOrSocketPath
:
false
: websocket._originalUnixSocket
?
false
: parsedUrl.host === websocket._originalHostOrSocketPath;
if (!isSameHost || (websocket._originalSecure && !isSecure)) {
//
// Match curl 7.77.0 behavior and drop the following headers. These
// headers are also dropped when following a redirect to a subdomain.
//
delete opts.headers.authorization;
delete opts.headers.cookie;
if (!isSameHost)
delete opts.headers.host;
opts.auth = undefined;
}
}
//
// Match curl 7.77.0 behavior and make the first `Authorization` header win.
// If the `Authorization` header is set, then there is nothing to do as it
// will take precedence.
//
if (opts.auth && !options.headers.authorization) {
options.headers.authorization =
'Basic ' + Buffer.from(opts.auth).toString(
'base64');
}
req = websocket._req = request(opts);
if (websocket._redirects) {
//
// Unlike what is done for the `'upgrade'` event, no early exit is
// triggered here if the user calls `websocket.close()` or
// `websocket.terminate()` from a listener of the `'redirect'` event. This
// is because the user can also call `request.destroy()` with an error
// before calling `websocket.close()` or `websocket.terminate()` and this
// would result in an error being emitted on the `request` object with no
// `'error'` event listeners attached.
//
websocket.emit(
'redirect', websocket.url, req);
}
}
else {
req = websocket._req = request(opts);
}
if (opts.timeout) {
req.on(
'timeout', () => {
abortHandshake(websocket, req,
'Opening handshake has timed out');
});
}
req.on(
'error', (err) => {
if (req ===
null || req[kAborted])
return;
req = websocket._req =
null;
emitErrorAndClose(websocket, err);
});
req.on(
'response', (res) => {
const location = res.headers.location;
const statusCode = res.statusCode;
if (
location &&
opts.followRedirects &&
statusCode >= 300 &&
statusCode < 400
) {
if (++websocket._redirects > opts.maxRedirects) {
abortHandshake(websocket, req,
'Maximum redirects exceeded');
return;
}
req.abort();
let addr;
try {
addr =
new URL(location, address);
}
catch (e) {
const err =
new SyntaxError(`Invalid URL: ${location}`);
emitErrorAndClose(websocket, err);
return;
}
initAsClient(websocket, addr, protocols, options);
}
else if (!websocket.emit(
'unexpected-response', req, res)) {
abortHandshake(
websocket,
req,
`Unexpected server response: ${res.statusCode}`
);
}
});
req.on(
'upgrade', (res, socket, head) => {
websocket.emit(
'upgrade', res);
//
// The user may have closed the connection from a listener of the
// `'upgrade'` event.
//
if (websocket.readyState !== WebSocket.CONNECTING)
return;
req = websocket._req =
null;
if (res.headers.upgrade.toLowerCase() !==
'websocket') {
abortHandshake(websocket, socket,
'Invalid Upgrade header');
return;
}
const digest = createHash(
'sha1')
.update(key + GUID)
.digest(
'base64');
if (res.headers[
'sec-websocket-accept'] !== digest) {
abortHandshake(websocket, socket,
'Invalid Sec-WebSocket-Accept header');
return;
}
const serverProt = res.headers[
'sec-websocket-protocol'];
let protError;
if (serverProt !== undefined) {
if (!protocolSet.size) {
protError =
'Server sent a subprotocol but none was requested';
}
else if (!protocolSet.has(serverProt)) {
protError =
'Server sent an invalid subprotocol';
}
}
else if (protocolSet.size) {
protError =
'Server sent no subprotocol';
}
if (protError) {
abortHandshake(websocket, socket, protError);
return;
}
if (serverProt) websocket._protocol = serverProt;
const secWebSocketExtensions = res.headers[
'sec-websocket-extensions'];
if (secWebSocketExtensions !== undefined) {
if (!perMessageDeflate) {
const message =
'Server sent a Sec-WebSocket-Extensions header but no extension ' +
'was requested';
abortHandshake(websocket, socket, message);
return;
}
let extensions;
try {
extensions = parse(secWebSocketExtensions);
}
catch (err) {
const message =
'Invalid Sec-WebSocket-Extensions header';
abortHandshake(websocket, socket, message);
return;
}
const extensionNames = Object.keys(extensions);
if (
extensionNames.length !== 1 ||
extensionNames[0] !== PerMessageDeflate.extensionName
) {
const message =
'Server indicated an extension that was not requested';
abortHandshake(websocket, socket, message);
return;
}
try {
perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
}
catch (err) {
const message =
'Invalid Sec-WebSocket-Extensions header';
abortHandshake(websocket, socket, message);
return;
}
websocket._extensions[PerMessageDeflate.extensionName] =
perMessageDeflate;
}
websocket.setSocket(socket, head, {
generateMask: opts.generateMask,
maxPayload: opts.maxPayload,
skipUTF8Validation: opts.skipUTF8Validation
});
});
req.end();
}
/**
* Emit the `'error'` and `'close'` events.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {Error} The error to emit
* @private
*/
function emitErrorAndClose(websocket, err) {
websocket._readyState = WebSocket.CLOSING;
websocket.emit(
'error', err);
websocket.emitClose();
}
/**
* Create a `net.Socket` and initiate a connection.
*
* @param {Object} options Connection options
* @return {net.Socket} The newly created socket used to start the connection
* @private
*/
function netConnect(options) {
options.path = options.socketPath;
return net.connect(options);
}
/**
* Create a `tls.TLSSocket` and initiate a connection.
*
* @param {Object} options Connection options
* @return {tls.TLSSocket} The newly created socket used to start the connection
* @private
*/
function tlsConnect(options) {
options.path = undefined;
if (!options.servername && options.servername !==
'') {
options.servername = net.isIP(options.host) ?
'' : options.host;
}
return tls.connect(options);
}
/**
* Abort the handshake and emit an error.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {(http.ClientRequest|net.Socket|tls.Socket)} stream The request to
* abort or the socket to destroy
* @param {String} message The error message
* @private
*/
function abortHandshake(websocket, stream, message) {
websocket._readyState = WebSocket.CLOSING;
const err =
new Error(message);
Error.captureStackTrace(err, abortHandshake);
if (stream.setHeader) {
stream[kAborted] =
true;
stream.abort();
if (stream.socket && !stream.socket.destroyed) {
//
// On Node.js >= 14.3.0 `request.abort()` does not destroy the socket if
// called after the request completed. See
// https://github.com/websockets/ws/issues/1869.
//
stream.socket.destroy();
}
process.nextTick(emitErrorAndClose, websocket, err);
}
else {
stream.destroy(err);
stream.once(
'error', websocket.emit.bind(websocket,
'error'));
stream.once(
'close', websocket.emitClose.bind(websocket));
}
}
/**
* Handle cases where the `ping()`, `pong()`, or `send()` methods are called
* when the `readyState` attribute is `CLOSING` or `CLOSED`.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {*} [data] The data to send
* @param {Function} [cb] Callback
* @private
*/
function sendAfterClose(websocket, data, cb) {
if (data) {
const length = toBuffer(data).length;
//
// The `_bufferedAmount` property is used only when the peer is a client and
// the opening handshake fails. Under these circumstances, in fact, the
// `setSocket()` method is not called, so the `_socket` and `_sender`
// properties are set to `null`.
//
if (websocket._socket) websocket._sender._bufferedBytes += length;
else websocket._bufferedAmount += length;
}
if (cb) {
const err =
new Error(
`WebSocket is not open: readyState ${websocket.readyState} ` +
`(${readyStates[websocket.readyState]})`
);
cb(err);
}
}
/**
* The listener of the `Receiver` `'conclude'` event.
*
* @param {Number} code The status code
* @param {Buffer} reason The reason for closing
* @private
*/
function receiverOnConclude(code, reason) {
const websocket =
this[kWebSocket];
websocket._closeFrameReceived =
true;
websocket._closeMessage = reason;
websocket._closeCode = code;
if (websocket._socket[kWebSocket] === undefined)
return;
websocket._socket.removeListener(
'data', socketOnData);
process.nextTick(resume, websocket._socket);
if (code === 1005) websocket.close();
else websocket.close(code, reason);
}
/**
* The listener of the `Receiver` `'drain'` event.
*
* @private
*/
function receiverOnDrain() {
const websocket =
this[kWebSocket];
if (!websocket.isPaused) websocket._socket.resume();
}
/**
* The listener of the `Receiver` `'error'` event.
*
* @param {(RangeError|Error)} err The emitted error
* @private
*/
function receiverOnError(err) {
const websocket =
this[kWebSocket];
if (websocket._socket[kWebSocket] !== undefined) {
websocket._socket.removeListener(
'data', socketOnData);
//
// On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See
// https://github.com/websockets/ws/issues/1940.
//
process.nextTick(resume, websocket._socket);
websocket.close(err[kStatusCode]);
}
websocket.emit(
'error', err);
}
/**
* The listener of the `Receiver` `'finish'` event.
*
* @private
*/
function receiverOnFinish() {
this[kWebSocket].emitClose();
}
/**
* The listener of the `Receiver` `'message'` event.
*
* @param {Buffer|ArrayBuffer|Buffer[])} data The message
* @param {Boolean} isBinary Specifies whether the message is binary or not
* @private
*/
function receiverOnMessage(data, isBinary) {
this[kWebSocket].emit(
'message', data, isBinary);
}
/**
* The listener of the `Receiver` `'ping'` event.
*
* @param {Buffer} data The data included in the ping frame
* @private
*/
function receiverOnPing(data) {
const websocket =
this[kWebSocket];
websocket.pong(data, !websocket._isServer, NOOP);
websocket.emit(
'ping', data);
}
/**
* The listener of the `Receiver` `'pong'` event.
*
* @param {Buffer} data The data included in the pong frame
* @private
*/
function receiverOnPong(data) {
this[kWebSocket].emit(
'pong', data);
}
/**
* Resume a readable stream
*
* @param {Readable} stream The readable stream
* @private
*/
function resume(stream) {
stream.resume();
}
/**
* The listener of the `net.Socket` `'close'` event.
*
* @private
*/
function socketOnClose() {
const websocket =
this[kWebSocket];
this.removeListener(
'close', socketOnClose);
this.removeListener(
'data', socketOnData);
this.removeListener(
'end', socketOnEnd);
websocket._readyState = WebSocket.CLOSING;
let chunk;
//
// The close frame might not have been received or the `'end'` event emitted,
// for example, if the socket was destroyed due to an error. Ensure that the
// `receiver` stream is closed after writing any remaining buffered data to
// it. If the readable side of the socket is in flowing mode then there is no
// buffered data as everything has been already written and `readable.read()`
// will return `null`. If instead, the socket is paused, any possible buffered
// data will be read as a single chunk.
//
if (
!
this._readableState.endEmitted &&
!websocket._closeFrameReceived &&
!websocket._receiver._writableState.errorEmitted &&
(chunk = websocket._socket.read()) !==
null
) {
websocket._receiver.write(chunk);
}
websocket._receiver.end();
this[kWebSocket] = undefined;
clearTimeout(websocket._closeTimer);
if (
websocket._receiver._writableState.finished ||
websocket._receiver._writableState.errorEmitted
) {
websocket.emitClose();
}
else {
websocket._receiver.on(
'error', receiverOnFinish);
websocket._receiver.on(
'finish', receiverOnFinish);
}
}
/**
* The listener of the `net.Socket` `'data'` event.
*
* @param {Buffer} chunk A chunk of data
* @private
*/
function socketOnData(chunk) {
if (!
this[kWebSocket]._receiver.write(chunk)) {
this.pause();
}
}
/**
* The listener of the `net.Socket` `'end'` event.
*
* @private
*/
function socketOnEnd() {
const websocket =
this[kWebSocket];
websocket._readyState = WebSocket.CLOSING;
websocket._receiver.end();
this.end();
}
/**
* The listener of the `net.Socket` `'error'` event.
*
* @private
*/
function socketOnError() {
const websocket =
this[kWebSocket];
this.removeListener(
'error', socketOnError);
this.on(
'error', NOOP);
if (websocket) {
websocket._readyState = WebSocket.CLOSING;
this.destroy();
}
}