sender.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex" }] */
  2. 'use strict';
  3. const { Duplex } = require('stream');
  4. const { randomFillSync } = require('crypto');
  5. const PerMessageDeflate = require('./permessage-deflate');
  6. const { EMPTY_BUFFER } = require('./constants');
  7. const { isValidStatusCode } = require('./validation');
  8. const { mask: applyMask, toBuffer } = require('./buffer-util');
  9. const kByteLength = Symbol('kByteLength');
  10. const maskBuffer = Buffer.alloc(4);
  11. const RANDOM_POOL_SIZE = 8 * 1024;
  12. let randomPool;
  13. let randomPoolPointer = RANDOM_POOL_SIZE;
  14. /**
  15. * HyBi Sender implementation.
  16. */
  17. class Sender {
  18. /**
  19. * Creates a Sender instance.
  20. *
  21. * @param {Duplex} socket The connection socket
  22. * @param {Object} [extensions] An object containing the negotiated extensions
  23. * @param {Function} [generateMask] The function used to generate the masking
  24. * key
  25. */
  26. constructor(socket, extensions, generateMask) {
  27. this._extensions = extensions || {};
  28. if (generateMask) {
  29. this._generateMask = generateMask;
  30. this._maskBuffer = Buffer.alloc(4);
  31. }
  32. this._socket = socket;
  33. this._firstFragment = true;
  34. this._compress = false;
  35. this._bufferedBytes = 0;
  36. this._deflating = false;
  37. this._queue = [];
  38. }
  39. /**
  40. * Frames a piece of data according to the HyBi WebSocket protocol.
  41. *
  42. * @param {(Buffer|String)} data The data to frame
  43. * @param {Object} options Options object
  44. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  45. * FIN bit
  46. * @param {Function} [options.generateMask] The function used to generate the
  47. * masking key
  48. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  49. * `data`
  50. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  51. * key
  52. * @param {Number} options.opcode The opcode
  53. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  54. * modified
  55. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  56. * RSV1 bit
  57. * @return {(Buffer|String)[]} The framed data
  58. * @public
  59. */
  60. static frame(data, options) {
  61. let mask;
  62. let merge = false;
  63. let offset = 2;
  64. let skipMasking = false;
  65. if (options.mask) {
  66. mask = options.maskBuffer || maskBuffer;
  67. if (options.generateMask) {
  68. options.generateMask(mask);
  69. } else {
  70. if (randomPoolPointer === RANDOM_POOL_SIZE) {
  71. /* istanbul ignore else */
  72. if (randomPool === undefined) {
  73. //
  74. // This is lazily initialized because server-sent frames must not
  75. // be masked so it may never be used.
  76. //
  77. randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
  78. }
  79. randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
  80. randomPoolPointer = 0;
  81. }
  82. mask[0] = randomPool[randomPoolPointer++];
  83. mask[1] = randomPool[randomPoolPointer++];
  84. mask[2] = randomPool[randomPoolPointer++];
  85. mask[3] = randomPool[randomPoolPointer++];
  86. }
  87. skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
  88. offset = 6;
  89. }
  90. let dataLength;
  91. if (typeof data === 'string') {
  92. if (
  93. (!options.mask || skipMasking) &&
  94. options[kByteLength] !== undefined
  95. ) {
  96. dataLength = options[kByteLength];
  97. } else {
  98. data = Buffer.from(data);
  99. dataLength = data.length;
  100. }
  101. } else {
  102. dataLength = data.length;
  103. merge = options.mask && options.readOnly && !skipMasking;
  104. }
  105. let payloadLength = dataLength;
  106. if (dataLength >= 65536) {
  107. offset += 8;
  108. payloadLength = 127;
  109. } else if (dataLength > 125) {
  110. offset += 2;
  111. payloadLength = 126;
  112. }
  113. const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);
  114. target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
  115. if (options.rsv1) target[0] |= 0x40;
  116. target[1] = payloadLength;
  117. if (payloadLength === 126) {
  118. target.writeUInt16BE(dataLength, 2);
  119. } else if (payloadLength === 127) {
  120. target[2] = target[3] = 0;
  121. target.writeUIntBE(dataLength, 4, 6);
  122. }
  123. if (!options.mask) return [target, data];
  124. target[1] |= 0x80;
  125. target[offset - 4] = mask[0];
  126. target[offset - 3] = mask[1];
  127. target[offset - 2] = mask[2];
  128. target[offset - 1] = mask[3];
  129. if (skipMasking) return [target, data];
  130. if (merge) {
  131. applyMask(data, mask, target, offset, dataLength);
  132. return [target];
  133. }
  134. applyMask(data, mask, data, 0, dataLength);
  135. return [target, data];
  136. }
  137. /**
  138. * Sends a close message to the other peer.
  139. *
  140. * @param {Number} [code] The status code component of the body
  141. * @param {(String|Buffer)} [data] The message component of the body
  142. * @param {Boolean} [mask=false] Specifies whether or not to mask the message
  143. * @param {Function} [cb] Callback
  144. * @public
  145. */
  146. close(code, data, mask, cb) {
  147. let buf;
  148. if (code === undefined) {
  149. buf = EMPTY_BUFFER;
  150. } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
  151. throw new TypeError('First argument must be a valid error code number');
  152. } else if (data === undefined || !data.length) {
  153. buf = Buffer.allocUnsafe(2);
  154. buf.writeUInt16BE(code, 0);
  155. } else {
  156. const length = Buffer.byteLength(data);
  157. if (length > 123) {
  158. throw new RangeError('The message must not be greater than 123 bytes');
  159. }
  160. buf = Buffer.allocUnsafe(2 + length);
  161. buf.writeUInt16BE(code, 0);
  162. if (typeof data === 'string') {
  163. buf.write(data, 2);
  164. } else {
  165. buf.set(data, 2);
  166. }
  167. }
  168. const options = {
  169. [kByteLength]: buf.length,
  170. fin: true,
  171. generateMask: this._generateMask,
  172. mask,
  173. maskBuffer: this._maskBuffer,
  174. opcode: 0x08,
  175. readOnly: false,
  176. rsv1: false
  177. };
  178. if (this._deflating) {
  179. this.enqueue([this.dispatch, buf, false, options, cb]);
  180. } else {
  181. this.sendFrame(Sender.frame(buf, options), cb);
  182. }
  183. }
  184. /**
  185. * Sends a ping message to the other peer.
  186. *
  187. * @param {*} data The message to send
  188. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  189. * @param {Function} [cb] Callback
  190. * @public
  191. */
  192. ping(data, mask, cb) {
  193. let byteLength;
  194. let readOnly;
  195. if (typeof data === 'string') {
  196. byteLength = Buffer.byteLength(data);
  197. readOnly = false;
  198. } else {
  199. data = toBuffer(data);
  200. byteLength = data.length;
  201. readOnly = toBuffer.readOnly;
  202. }
  203. if (byteLength > 125) {
  204. throw new RangeError('The data size must not be greater than 125 bytes');
  205. }
  206. const options = {
  207. [kByteLength]: byteLength,
  208. fin: true,
  209. generateMask: this._generateMask,
  210. mask,
  211. maskBuffer: this._maskBuffer,
  212. opcode: 0x09,
  213. readOnly,
  214. rsv1: false
  215. };
  216. if (this._deflating) {
  217. this.enqueue([this.dispatch, data, false, options, cb]);
  218. } else {
  219. this.sendFrame(Sender.frame(data, options), cb);
  220. }
  221. }
  222. /**
  223. * Sends a pong message to the other peer.
  224. *
  225. * @param {*} data The message to send
  226. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  227. * @param {Function} [cb] Callback
  228. * @public
  229. */
  230. pong(data, mask, cb) {
  231. let byteLength;
  232. let readOnly;
  233. if (typeof data === 'string') {
  234. byteLength = Buffer.byteLength(data);
  235. readOnly = false;
  236. } else {
  237. data = toBuffer(data);
  238. byteLength = data.length;
  239. readOnly = toBuffer.readOnly;
  240. }
  241. if (byteLength > 125) {
  242. throw new RangeError('The data size must not be greater than 125 bytes');
  243. }
  244. const options = {
  245. [kByteLength]: byteLength,
  246. fin: true,
  247. generateMask: this._generateMask,
  248. mask,
  249. maskBuffer: this._maskBuffer,
  250. opcode: 0x0a,
  251. readOnly,
  252. rsv1: false
  253. };
  254. if (this._deflating) {
  255. this.enqueue([this.dispatch, data, false, options, cb]);
  256. } else {
  257. this.sendFrame(Sender.frame(data, options), cb);
  258. }
  259. }
  260. /**
  261. * Sends a data message to the other peer.
  262. *
  263. * @param {*} data The message to send
  264. * @param {Object} options Options object
  265. * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
  266. * or text
  267. * @param {Boolean} [options.compress=false] Specifies whether or not to
  268. * compress `data`
  269. * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
  270. * last one
  271. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  272. * `data`
  273. * @param {Function} [cb] Callback
  274. * @public
  275. */
  276. send(data, options, cb) {
  277. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  278. let opcode = options.binary ? 2 : 1;
  279. let rsv1 = options.compress;
  280. let byteLength;
  281. let readOnly;
  282. if (typeof data === 'string') {
  283. byteLength = Buffer.byteLength(data);
  284. readOnly = false;
  285. } else {
  286. data = toBuffer(data);
  287. byteLength = data.length;
  288. readOnly = toBuffer.readOnly;
  289. }
  290. if (this._firstFragment) {
  291. this._firstFragment = false;
  292. if (
  293. rsv1 &&
  294. perMessageDeflate &&
  295. perMessageDeflate.params[
  296. perMessageDeflate._isServer
  297. ? 'server_no_context_takeover'
  298. : 'client_no_context_takeover'
  299. ]
  300. ) {
  301. rsv1 = byteLength >= perMessageDeflate._threshold;
  302. }
  303. this._compress = rsv1;
  304. } else {
  305. rsv1 = false;
  306. opcode = 0;
  307. }
  308. if (options.fin) this._firstFragment = true;
  309. if (perMessageDeflate) {
  310. const opts = {
  311. [kByteLength]: byteLength,
  312. fin: options.fin,
  313. generateMask: this._generateMask,
  314. mask: options.mask,
  315. maskBuffer: this._maskBuffer,
  316. opcode,
  317. readOnly,
  318. rsv1
  319. };
  320. if (this._deflating) {
  321. this.enqueue([this.dispatch, data, this._compress, opts, cb]);
  322. } else {
  323. this.dispatch(data, this._compress, opts, cb);
  324. }
  325. } else {
  326. this.sendFrame(
  327. Sender.frame(data, {
  328. [kByteLength]: byteLength,
  329. fin: options.fin,
  330. generateMask: this._generateMask,
  331. mask: options.mask,
  332. maskBuffer: this._maskBuffer,
  333. opcode,
  334. readOnly,
  335. rsv1: false
  336. }),
  337. cb
  338. );
  339. }
  340. }
  341. /**
  342. * Dispatches a message.
  343. *
  344. * @param {(Buffer|String)} data The message to send
  345. * @param {Boolean} [compress=false] Specifies whether or not to compress
  346. * `data`
  347. * @param {Object} options Options object
  348. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  349. * FIN bit
  350. * @param {Function} [options.generateMask] The function used to generate the
  351. * masking key
  352. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  353. * `data`
  354. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  355. * key
  356. * @param {Number} options.opcode The opcode
  357. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  358. * modified
  359. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  360. * RSV1 bit
  361. * @param {Function} [cb] Callback
  362. * @private
  363. */
  364. dispatch(data, compress, options, cb) {
  365. if (!compress) {
  366. this.sendFrame(Sender.frame(data, options), cb);
  367. return;
  368. }
  369. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  370. this._bufferedBytes += options[kByteLength];
  371. this._deflating = true;
  372. perMessageDeflate.compress(data, options.fin, (_, buf) => {
  373. if (this._socket.destroyed) {
  374. const err = new Error(
  375. 'The socket was closed while data was being compressed'
  376. );
  377. if (typeof cb === 'function') cb(err);
  378. for (let i = 0; i < this._queue.length; i++) {
  379. const params = this._queue[i];
  380. const callback = params[params.length - 1];
  381. if (typeof callback === 'function') callback(err);
  382. }
  383. return;
  384. }
  385. this._bufferedBytes -= options[kByteLength];
  386. this._deflating = false;
  387. options.readOnly = false;
  388. this.sendFrame(Sender.frame(buf, options), cb);
  389. this.dequeue();
  390. });
  391. }
  392. /**
  393. * Executes queued send operations.
  394. *
  395. * @private
  396. */
  397. dequeue() {
  398. while (!this._deflating && this._queue.length) {
  399. const params = this._queue.shift();
  400. this._bufferedBytes -= params[3][kByteLength];
  401. Reflect.apply(params[0], this, params.slice(1));
  402. }
  403. }
  404. /**
  405. * Enqueues a send operation.
  406. *
  407. * @param {Array} params Send operation parameters.
  408. * @private
  409. */
  410. enqueue(params) {
  411. this._bufferedBytes += params[3][kByteLength];
  412. this._queue.push(params);
  413. }
  414. /**
  415. * Sends a frame.
  416. *
  417. * @param {Buffer[]} list The frame to send
  418. * @param {Function} [cb] Callback
  419. * @private
  420. */
  421. sendFrame(list, cb) {
  422. if (list.length === 2) {
  423. this._socket.cork();
  424. this._socket.write(list[0]);
  425. this._socket.write(list[1], cb);
  426. this._socket.uncork();
  427. } else {
  428. this._socket.write(list[0], cb);
  429. }
  430. }
  431. }
  432. module.exports = Sender;