/*! * ws: a node.js websocket client * Copyright(c) 2011 Einar Otto Stangvik * MIT Licensed */ 'use strict'; const EventEmitter = require('events'); const crypto = require('crypto'); const Ultron = require('ultron'); const https = require('https'); const http = require('http'); const url = require('url'); const PerMessageDeflate = require('./PerMessageDeflate'); const EventTarget = require('./EventTarget'); const Extensions = require('./Extensions'); const constants = require('./Constants'); const Receiver = require('./Receiver'); const Sender = require('./Sender'); const protocolVersions = [8, 13]; const closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly. /** * Class representing a WebSocket. * * @extends EventEmitter */ class WebSocket extends EventEmitter { /** * Create a new `WebSocket`. * * @param {String} address The URL to which to connect * @param {(String|String[])} protocols The subprotocols * @param {Object} options Connection options */ constructor (address, protocols, options) { super(); if (!protocols) { protocols = []; } else if (typeof protocols === 'string') { protocols = [protocols]; } else if (!Array.isArray(protocols)) { options = protocols; protocols = []; } this.readyState = WebSocket.CONNECTING; this.bytesReceived = 0; this.extensions = {}; this.protocol = ''; this._binaryType = constants.BINARY_TYPES[0]; this._finalize = this.finalize.bind(this); this._finalizeCalled = false; this._closeMessage = null; this._closeTimer = null; this._closeCode = null; this._receiver = null; this._sender = null; this._socket = null; this._ultron = null; if (Array.isArray(address)) { initAsServerClient.call(this, address[0], address[1], address[2], options); } else { initAsClient.call(this, address, protocols, options); } } get CONNECTING () { return WebSocket.CONNECTING; } get CLOSING () { return WebSocket.CLOSING; } get CLOSED () { return WebSocket.CLOSED; } get OPEN () { return WebSocket.OPEN; } /** * @type {Number} */ get bufferedAmount () { var amount = 0; if (this._socket) { amount = this._socket.bufferSize + this._sender.bufferedBytes; } return amount; } /** * This deviates from the WHATWG interface since ws doesn't support the required * default "blob" type (instead we define a custom "nodebuffer" type). * * @type {String} */ get binaryType () { return this._binaryType; } set binaryType (type) { if (constants.BINARY_TYPES.indexOf(type) < 0) return; this._binaryType = type; // // Allow to change `binaryType` on the fly. // if (this._receiver) this._receiver.binaryType = type; } /** * Set up the socket and the internal resources. * * @param {net.Socket} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @private */ setSocket (socket, head) { socket.setTimeout(0); socket.setNoDelay(); this._receiver = new Receiver(this.extensions, this.maxPayload, this.binaryType); this._sender = new Sender(socket, this.extensions); this._ultron = new Ultron(socket); this._socket = socket; // socket cleanup handlers this._ultron.on('close', this._finalize); this._ultron.on('error', this._finalize); this._ultron.on('end', this._finalize); // ensure that the head is added to the receiver if (head && head.length > 0) { socket.unshift(head); head = null; } // subsequent packets are pushed to the receiver this._ultron.on('data', (data) => { this.bytesReceived += data.length; this._receiver.add(data); }); // receiver event handlers this._receiver.onmessage = (data, flags) => this.emit('message', data, flags); this._receiver.onping = (data, flags) => { this.pong(data, !this._isServer, true); this.emit('ping', data, flags); }; this._receiver.onpong = (data, flags) => this.emit('pong', data, flags); this._receiver.onclose = (code, reason) => { this._closeMessage = reason; this._closeCode = code; this.close(code, reason); }; this._receiver.onerror = (error, code) => { // close the connection when the receiver reports a HyBi error code this.close(code, ''); this.emit('error', error); }; // sender event handlers this._sender.onerror = (error) => { this.close(1002, ''); this.emit('error', error); }; this.readyState = WebSocket.OPEN; this.emit('open'); } /** * Clean up and release internal resources. * * @param {(Boolean|Error)} Indicates whether or not an error occurred * @private */ finalize (error) { if (this._finalizeCalled) return; this.readyState = WebSocket.CLOSING; this._finalizeCalled = true; clearTimeout(this._closeTimer); this._closeTimer = null; // // If the connection was closed abnormally (with an error), or if the close // control frame was malformed or not received then the close code must be // 1006. // if (error) this._closeCode = 1006; if (this._socket) { this._ultron.destroy(); this._socket.on('error', function onerror () { this.destroy(); }); if (!error) this._socket.end(); else this._socket.destroy(); this._socket = null; this._ultron = null; } if (this._sender) { this._sender = this._sender.onerror = null; } if (this._receiver) { this._receiver.cleanup(() => this.emitClose()); this._receiver = null; } else { this.emitClose(); } } /** * Emit the `close` event. * * @private */ emitClose () { this.readyState = WebSocket.CLOSED; this.emit('close', this._closeCode || 1006, this._closeMessage || ''); if (this.extensions[PerMessageDeflate.extensionName]) { this.extensions[PerMessageDeflate.extensionName].cleanup(); } this.extensions = null; this.removeAllListeners(); this.on('error', constants.NOOP); // Catch all errors after this. } /** * Pause the socket stream. * * @public */ pause () { if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); this._socket.pause(); } /** * Resume the socket stream * * @public */ resume () { if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); this._socket.resume(); } /** * Start a closing handshake. * * @param {Number} code Status code explaining why the connection is closing * @param {String} data A string explaining why the connection is closing * @public */ close (code, data) { if (this.readyState === WebSocket.CLOSED) return; if (this.readyState === WebSocket.CONNECTING) { if (this._req && !this._req.aborted) { this._req.abort(); this.emit('error', new Error('closed before the connection is established')); this.finalize(true); } return; } if (this.readyState === WebSocket.CLOSING) { if (this._closeCode && this._socket) this._socket.end(); return; } this.readyState = WebSocket.CLOSING; this._sender.close(code, data, !this._isServer, (err) => { if (err) this.emit('error', err); if (this._socket) { if (this._closeCode) this._socket.end(); // // Ensure that the connection is cleaned up even when the closing // handshake fails. // clearTimeout(this._closeTimer); this._closeTimer = setTimeout(this._finalize, closeTimeout, true); } }); } /** * Send a ping message. * * @param {*} data The message to send * @param {Boolean} mask Indicates whether or not to mask `data` * @param {Boolean} failSilently Indicates whether or not to throw if `readyState` isn't `OPEN` * @public */ ping (data, mask, failSilently) { if (this.readyState !== WebSocket.OPEN) { if (failSilently) return; throw new Error('not opened'); } if (typeof data === 'number') data = data.toString(); if (mask === undefined) mask = !this._isServer; this._sender.ping(data || constants.EMPTY_BUFFER, mask); } /** * Send a pong message. * * @param {*} data The message to send * @param {Boolean} mask Indicates whether or not to mask `data` * @param {Boolean} failSilently Indicates whether or not to throw if `readyState` isn't `OPEN` * @public */ pong (data, mask, failSilently) { if (this.readyState !== WebSocket.OPEN) { if (failSilently) return; throw new Error('not opened'); } if (typeof data === 'number') data = data.toString(); if (mask === undefined) mask = !this._isServer; this._sender.pong(data || constants.EMPTY_BUFFER, mask); } /** * Send a data message. * * @param {*} data The message to send * @param {Object} options Options object * @param {Boolean} options.compress Specifies whether or not to compress `data` * @param {Boolean} options.binary Specifies whether `data` is binary or text * @param {Boolean} options.fin Specifies whether the fragment is the last one * @param {Boolean} options.mask Specifies whether or not to mask `data` * @param {Function} cb Callback which is executed when data is written out * @public */ send (data, options, cb) { if (typeof options === 'function') { cb = options; options = {}; } if (this.readyState !== WebSocket.OPEN) { if (cb) cb(new Error('not opened')); else throw new Error('not opened'); return; } if (typeof data === 'number') data = data.toString(); const opts = Object.assign({ binary: typeof data !== 'string', mask: !this._isServer, compress: true, fin: true }, options); if (!this.extensions[PerMessageDeflate.extensionName]) { opts.compress = false; } this._sender.send(data || constants.EMPTY_BUFFER, opts, cb); } /** * Forcibly close the connection. * * @public */ terminate () { if (this.readyState === WebSocket.CLOSED) return; if (this.readyState === WebSocket.CONNECTING) { if (this._req && !this._req.aborted) { this._req.abort(); this.emit('error', new Error('closed before the connection is established')); this.finalize(true); } return; } this.finalize(true); } } WebSocket.CONNECTING = 0; WebSocket.OPEN = 1; WebSocket.CLOSING = 2; WebSocket.CLOSED = 3; // // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface // ['open', 'error', 'close', 'message'].forEach((method) => { Object.defineProperty(WebSocket.prototype, `on${method}`, { /** * Return the listener of the event. * * @return {(Function|undefined)} The event listener or `undefined` * @public */ get () { const listeners = this.listeners(method); for (var i = 0; i < listeners.length; i++) { if (listeners[i]._listener) return listeners[i]._listener; } }, /** * Add a listener for the event. * * @param {Function} listener The listener to add * @public */ set (listener) { const listeners = this.listeners(method); for (var i = 0; i < listeners.length; i++) { // // Remove only the listeners added via `addEventListener`. // if (listeners[i]._listener) this.removeListener(method, listeners[i]); } this.addEventListener(method, listener); } }); }); WebSocket.prototype.addEventListener = EventTarget.addEventListener; WebSocket.prototype.removeEventListener = EventTarget.removeEventListener; module.exports = WebSocket; /** * Initialize a WebSocket server client. * * @param {http.IncomingMessage} req The request object * @param {net.Socket} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Object} options WebSocket attributes * @param {Number} options.protocolVersion The WebSocket protocol version * @param {Object} options.extensions The negotiated extensions * @param {Number} options.maxPayload The maximum allowed message size * @param {String} options.protocol The chosen subprotocol * @private */ function initAsServerClient (req, socket, head, options) { this.protocolVersion = options.protocolVersion; this.extensions = options.extensions; this.maxPayload = options.maxPayload; this.protocol = options.protocol; this.upgradeReq = req; this._isServer = true; this.setSocket(socket, head); } /** * Initialize a WebSocket client. * * @param {String} address The URL to which to connect * @param {String[]} protocols The list of subprotocols * @param {Object} options Connection options * @param {String} options.protocol Value of the `Sec-WebSocket-Protocol` header * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable permessage-deflate * @param {String} options.localAddress Local interface to bind for network connections * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` header * @param {Object} options.headers An object containing request headers * @param {String} options.origin Value of the `Origin` or `Sec-WebSocket-Origin` header * @param {http.Agent} options.agent Use the specified Agent * @param {String} options.host Value of the `Host` header * @param {Number} options.family IP address family to use during hostname lookup (4 or 6). * @param {Function} options.checkServerIdentity A function to validate the server hostname * @param {Boolean} options.rejectUnauthorized Verify or not the server certificate * @param {String} options.passphrase The passphrase for the private key or pfx * @param {String} options.ciphers The ciphers to use or exclude * @param {(String|String[]|Buffer|Buffer[])} options.cert The certificate key * @param {(String|String[]|Buffer|Buffer[])} options.key The private key * @param {(String|Buffer)} options.pfx The private key, certificate, and CA certs * @param {(String|String[]|Buffer|Buffer[])} options.ca Trusted certificates * @private */ function initAsClient (address, protocols, options) { options = Object.assign({ protocolVersion: protocolVersions[1], protocol: protocols.join(','), perMessageDeflate: true, localAddress: null, headers: null, family: null, origin: null, agent: null, host: null, // // SSL options. // checkServerIdentity: null, rejectUnauthorized: null, passphrase: null, ciphers: null, cert: null, key: null, pfx: null, ca: null }, options); if (protocolVersions.indexOf(options.protocolVersion) === -1) { throw new Error( `unsupported protocol version: ${options.protocolVersion} ` + `(supported versions: ${protocolVersions.join(', ')})` ); } this.protocolVersion = options.protocolVersion; this._isServer = false; this.url = address; const serverUrl = url.parse(address); const isUnixSocket = serverUrl.protocol === 'ws+unix:'; if (!serverUrl.host && (!isUnixSocket || !serverUrl.path)) { throw new Error('invalid url'); } const isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:'; const key = crypto.randomBytes(16).toString('base64'); const httpObj = isSecure ? https : http; // // Prepare extensions. // const extensionsOffer = {}; var perMessageDeflate; if (options.perMessageDeflate) { perMessageDeflate = new PerMessageDeflate( options.perMessageDeflate !== true ? options.perMessageDeflate : {}, false ); extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer(); } const requestOptions = { port: serverUrl.port || (isSecure ? 443 : 80), host: serverUrl.hostname, path: '/', headers: { 'Sec-WebSocket-Version': options.protocolVersion, 'Sec-WebSocket-Key': key, 'Connection': 'Upgrade', 'Upgrade': 'websocket' } }; if (options.headers) Object.assign(requestOptions.headers, options.headers); if (Object.keys(extensionsOffer).length) { requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer); } if (options.protocol) { requestOptions.headers['Sec-WebSocket-Protocol'] = options.protocol; } if (options.origin) { if (options.protocolVersion < 13) { requestOptions.headers['Sec-WebSocket-Origin'] = options.origin; } else { requestOptions.headers.Origin = options.origin; } } if (options.host) requestOptions.headers.Host = options.host; if (serverUrl.auth) requestOptions.auth = serverUrl.auth; if (options.localAddress) requestOptions.localAddress = options.localAddress; if (options.family) requestOptions.family = options.family; if (isUnixSocket) { const parts = serverUrl.path.split(':'); requestOptions.socketPath = parts[0]; requestOptions.path = parts[1]; } else if (serverUrl.path) { // // Make sure that path starts with `/`. // if (serverUrl.path.charAt(0) !== '/') { requestOptions.path = `/${serverUrl.path}`; } else { requestOptions.path = serverUrl.path; } } var agent = options.agent; // // A custom agent is required for these options. // if ( options.rejectUnauthorized != null || options.checkServerIdentity || options.passphrase || options.ciphers || options.cert || options.key || options.pfx || options.ca ) { if (options.passphrase) requestOptions.passphrase = options.passphrase; if (options.ciphers) requestOptions.ciphers = options.ciphers; if (options.cert) requestOptions.cert = options.cert; if (options.key) requestOptions.key = options.key; if (options.pfx) requestOptions.pfx = options.pfx; if (options.ca) requestOptions.ca = options.ca; if (options.checkServerIdentity) { requestOptions.checkServerIdentity = options.checkServerIdentity; } if (options.rejectUnauthorized != null) { requestOptions.rejectUnauthorized = options.rejectUnauthorized; } if (!agent) agent = new httpObj.Agent(requestOptions); } if (agent) requestOptions.agent = agent; this._req = httpObj.get(requestOptions); this._req.on('error', (error) => { if (this._req.aborted) return; this._req = null; this.emit('error', error); this.finalize(true); }); this._req.on('response', (res) => { if (!this.emit('unexpected-response', this._req, res)) { this._req.abort(); this.emit('error', new Error(`unexpected server response (${res.statusCode})`)); this.finalize(true); } }); this._req.on('upgrade', (res, socket, head) => { this._req = null; const digest = crypto.createHash('sha1') .update(key + constants.GUID, 'binary') .digest('base64'); if (res.headers['sec-websocket-accept'] !== digest) { socket.destroy(); this.emit('error', new Error('invalid server key')); return this.finalize(true); } const serverProt = res.headers['sec-websocket-protocol']; const protList = (options.protocol || '').split(/, */); var protError; if (!options.protocol && serverProt) { protError = 'server sent a subprotocol even though none requested'; } else if (options.protocol && !serverProt) { protError = 'server sent no subprotocol even though requested'; } else if (serverProt && protList.indexOf(serverProt) === -1) { protError = 'server responded with an invalid protocol'; } if (protError) { socket.destroy(); this.emit('error', new Error(protError)); return this.finalize(true); } if (serverProt) this.protocol = serverProt; const serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']); if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) { try { perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]); } catch (err) { socket.destroy(); this.emit('error', new Error('invalid extension parameter')); return this.finalize(true); } this.extensions[PerMessageDeflate.extensionName] = perMessageDeflate; } this.setSocket(socket, head); }); }