receiver.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const FastBuffer = Buffer[Symbol.species];
  13. const GET_INFO = 0;
  14. const GET_PAYLOAD_LENGTH_16 = 1;
  15. const GET_PAYLOAD_LENGTH_64 = 2;
  16. const GET_MASK = 3;
  17. const GET_DATA = 4;
  18. const INFLATING = 5;
  19. const DEFER_EVENT = 6;
  20. /**
  21. * HyBi Receiver implementation.
  22. *
  23. * @extends Writable
  24. */
  25. class Receiver extends Writable {
  26. /**
  27. * Creates a Receiver instance.
  28. *
  29. * @param {Object} [options] Options object
  30. * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
  31. * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
  32. * multiple times in the same tick
  33. * @param {String} [options.binaryType=nodebuffer] The type for binary data
  34. * @param {Object} [options.extensions] An object containing the negotiated
  35. * extensions
  36. * @param {Boolean} [options.isServer=false] Specifies whether to operate in
  37. * client or server mode
  38. * @param {Number} [options.maxPayload=0] The maximum allowed message length
  39. * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
  40. * not to skip UTF-8 validation for text and close messages
  41. */
  42. constructor(options = {}) {
  43. super();
  44. this._allowSynchronousEvents =
  45. options.allowSynchronousEvents !== undefined
  46. ? options.allowSynchronousEvents
  47. : true;
  48. this._binaryType = options.binaryType || BINARY_TYPES[0];
  49. this._extensions = options.extensions || {};
  50. this._isServer = !!options.isServer;
  51. this._maxPayload = options.maxPayload | 0;
  52. this._skipUTF8Validation = !!options.skipUTF8Validation;
  53. this[kWebSocket] = undefined;
  54. this._bufferedBytes = 0;
  55. this._buffers = [];
  56. this._compressed = false;
  57. this._payloadLength = 0;
  58. this._mask = undefined;
  59. this._fragmented = 0;
  60. this._masked = false;
  61. this._fin = false;
  62. this._opcode = 0;
  63. this._totalPayloadLength = 0;
  64. this._messageLength = 0;
  65. this._fragments = [];
  66. this._errored = false;
  67. this._loop = false;
  68. this._state = GET_INFO;
  69. }
  70. /**
  71. * Implements `Writable.prototype._write()`.
  72. *
  73. * @param {Buffer} chunk The chunk of data to write
  74. * @param {String} encoding The character encoding of `chunk`
  75. * @param {Function} cb Callback
  76. * @private
  77. */
  78. _write(chunk, encoding, cb) {
  79. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  80. this._bufferedBytes += chunk.length;
  81. this._buffers.push(chunk);
  82. this.startLoop(cb);
  83. }
  84. /**
  85. * Consumes `n` bytes from the buffered data.
  86. *
  87. * @param {Number} n The number of bytes to consume
  88. * @return {Buffer} The consumed bytes
  89. * @private
  90. */
  91. consume(n) {
  92. this._bufferedBytes -= n;
  93. if (n === this._buffers[0].length) return this._buffers.shift();
  94. if (n < this._buffers[0].length) {
  95. const buf = this._buffers[0];
  96. this._buffers[0] = new FastBuffer(
  97. buf.buffer,
  98. buf.byteOffset + n,
  99. buf.length - n
  100. );
  101. return new FastBuffer(buf.buffer, buf.byteOffset, n);
  102. }
  103. const dst = Buffer.allocUnsafe(n);
  104. do {
  105. const buf = this._buffers[0];
  106. const offset = dst.length - n;
  107. if (n >= buf.length) {
  108. dst.set(this._buffers.shift(), offset);
  109. } else {
  110. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  111. this._buffers[0] = new FastBuffer(
  112. buf.buffer,
  113. buf.byteOffset + n,
  114. buf.length - n
  115. );
  116. }
  117. n -= buf.length;
  118. } while (n > 0);
  119. return dst;
  120. }
  121. /**
  122. * Starts the parsing loop.
  123. *
  124. * @param {Function} cb Callback
  125. * @private
  126. */
  127. startLoop(cb) {
  128. this._loop = true;
  129. do {
  130. switch (this._state) {
  131. case GET_INFO:
  132. this.getInfo(cb);
  133. break;
  134. case GET_PAYLOAD_LENGTH_16:
  135. this.getPayloadLength16(cb);
  136. break;
  137. case GET_PAYLOAD_LENGTH_64:
  138. this.getPayloadLength64(cb);
  139. break;
  140. case GET_MASK:
  141. this.getMask();
  142. break;
  143. case GET_DATA:
  144. this.getData(cb);
  145. break;
  146. case INFLATING:
  147. case DEFER_EVENT:
  148. this._loop = false;
  149. return;
  150. }
  151. } while (this._loop);
  152. if (!this._errored) cb();
  153. }
  154. /**
  155. * Reads the first two bytes of a frame.
  156. *
  157. * @param {Function} cb Callback
  158. * @private
  159. */
  160. getInfo(cb) {
  161. if (this._bufferedBytes < 2) {
  162. this._loop = false;
  163. return;
  164. }
  165. const buf = this.consume(2);
  166. if ((buf[0] & 0x30) !== 0x00) {
  167. const error = this.createError(
  168. RangeError,
  169. 'RSV2 and RSV3 must be clear',
  170. true,
  171. 1002,
  172. 'WS_ERR_UNEXPECTED_RSV_2_3'
  173. );
  174. cb(error);
  175. return;
  176. }
  177. const compressed = (buf[0] & 0x40) === 0x40;
  178. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  179. const error = this.createError(
  180. RangeError,
  181. 'RSV1 must be clear',
  182. true,
  183. 1002,
  184. 'WS_ERR_UNEXPECTED_RSV_1'
  185. );
  186. cb(error);
  187. return;
  188. }
  189. this._fin = (buf[0] & 0x80) === 0x80;
  190. this._opcode = buf[0] & 0x0f;
  191. this._payloadLength = buf[1] & 0x7f;
  192. if (this._opcode === 0x00) {
  193. if (compressed) {
  194. const error = this.createError(
  195. RangeError,
  196. 'RSV1 must be clear',
  197. true,
  198. 1002,
  199. 'WS_ERR_UNEXPECTED_RSV_1'
  200. );
  201. cb(error);
  202. return;
  203. }
  204. if (!this._fragmented) {
  205. const error = this.createError(
  206. RangeError,
  207. 'invalid opcode 0',
  208. true,
  209. 1002,
  210. 'WS_ERR_INVALID_OPCODE'
  211. );
  212. cb(error);
  213. return;
  214. }
  215. this._opcode = this._fragmented;
  216. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  217. if (this._fragmented) {
  218. const error = this.createError(
  219. RangeError,
  220. `invalid opcode ${this._opcode}`,
  221. true,
  222. 1002,
  223. 'WS_ERR_INVALID_OPCODE'
  224. );
  225. cb(error);
  226. return;
  227. }
  228. this._compressed = compressed;
  229. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  230. if (!this._fin) {
  231. const error = this.createError(
  232. RangeError,
  233. 'FIN must be set',
  234. true,
  235. 1002,
  236. 'WS_ERR_EXPECTED_FIN'
  237. );
  238. cb(error);
  239. return;
  240. }
  241. if (compressed) {
  242. const error = this.createError(
  243. RangeError,
  244. 'RSV1 must be clear',
  245. true,
  246. 1002,
  247. 'WS_ERR_UNEXPECTED_RSV_1'
  248. );
  249. cb(error);
  250. return;
  251. }
  252. if (
  253. this._payloadLength > 0x7d ||
  254. (this._opcode === 0x08 && this._payloadLength === 1)
  255. ) {
  256. const error = this.createError(
  257. RangeError,
  258. `invalid payload length ${this._payloadLength}`,
  259. true,
  260. 1002,
  261. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  262. );
  263. cb(error);
  264. return;
  265. }
  266. } else {
  267. const error = this.createError(
  268. RangeError,
  269. `invalid opcode ${this._opcode}`,
  270. true,
  271. 1002,
  272. 'WS_ERR_INVALID_OPCODE'
  273. );
  274. cb(error);
  275. return;
  276. }
  277. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  278. this._masked = (buf[1] & 0x80) === 0x80;
  279. if (this._isServer) {
  280. if (!this._masked) {
  281. const error = this.createError(
  282. RangeError,
  283. 'MASK must be set',
  284. true,
  285. 1002,
  286. 'WS_ERR_EXPECTED_MASK'
  287. );
  288. cb(error);
  289. return;
  290. }
  291. } else if (this._masked) {
  292. const error = this.createError(
  293. RangeError,
  294. 'MASK must be clear',
  295. true,
  296. 1002,
  297. 'WS_ERR_UNEXPECTED_MASK'
  298. );
  299. cb(error);
  300. return;
  301. }
  302. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  303. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  304. else this.haveLength(cb);
  305. }
  306. /**
  307. * Gets extended payload length (7+16).
  308. *
  309. * @param {Function} cb Callback
  310. * @private
  311. */
  312. getPayloadLength16(cb) {
  313. if (this._bufferedBytes < 2) {
  314. this._loop = false;
  315. return;
  316. }
  317. this._payloadLength = this.consume(2).readUInt16BE(0);
  318. this.haveLength(cb);
  319. }
  320. /**
  321. * Gets extended payload length (7+64).
  322. *
  323. * @param {Function} cb Callback
  324. * @private
  325. */
  326. getPayloadLength64(cb) {
  327. if (this._bufferedBytes < 8) {
  328. this._loop = false;
  329. return;
  330. }
  331. const buf = this.consume(8);
  332. const num = buf.readUInt32BE(0);
  333. //
  334. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  335. // if payload length is greater than this number.
  336. //
  337. if (num > Math.pow(2, 53 - 32) - 1) {
  338. const error = this.createError(
  339. RangeError,
  340. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  341. false,
  342. 1009,
  343. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  344. );
  345. cb(error);
  346. return;
  347. }
  348. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  349. this.haveLength(cb);
  350. }
  351. /**
  352. * Payload length has been read.
  353. *
  354. * @param {Function} cb Callback
  355. * @private
  356. */
  357. haveLength(cb) {
  358. if (this._payloadLength && this._opcode < 0x08) {
  359. this._totalPayloadLength += this._payloadLength;
  360. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  361. const error = this.createError(
  362. RangeError,
  363. 'Max payload size exceeded',
  364. false,
  365. 1009,
  366. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  367. );
  368. cb(error);
  369. return;
  370. }
  371. }
  372. if (this._masked) this._state = GET_MASK;
  373. else this._state = GET_DATA;
  374. }
  375. /**
  376. * Reads mask bytes.
  377. *
  378. * @private
  379. */
  380. getMask() {
  381. if (this._bufferedBytes < 4) {
  382. this._loop = false;
  383. return;
  384. }
  385. this._mask = this.consume(4);
  386. this._state = GET_DATA;
  387. }
  388. /**
  389. * Reads data bytes.
  390. *
  391. * @param {Function} cb Callback
  392. * @private
  393. */
  394. getData(cb) {
  395. let data = EMPTY_BUFFER;
  396. if (this._payloadLength) {
  397. if (this._bufferedBytes < this._payloadLength) {
  398. this._loop = false;
  399. return;
  400. }
  401. data = this.consume(this._payloadLength);
  402. if (
  403. this._masked &&
  404. (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
  405. ) {
  406. unmask(data, this._mask);
  407. }
  408. }
  409. if (this._opcode > 0x07) {
  410. this.controlMessage(data, cb);
  411. return;
  412. }
  413. if (this._compressed) {
  414. this._state = INFLATING;
  415. this.decompress(data, cb);
  416. return;
  417. }
  418. if (data.length) {
  419. //
  420. // This message is not compressed so its length is the sum of the payload
  421. // length of all fragments.
  422. //
  423. this._messageLength = this._totalPayloadLength;
  424. this._fragments.push(data);
  425. }
  426. this.dataMessage(cb);
  427. }
  428. /**
  429. * Decompresses data.
  430. *
  431. * @param {Buffer} data Compressed data
  432. * @param {Function} cb Callback
  433. * @private
  434. */
  435. decompress(data, cb) {
  436. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  437. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  438. if (err) return cb(err);
  439. if (buf.length) {
  440. this._messageLength += buf.length;
  441. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  442. const error = this.createError(
  443. RangeError,
  444. 'Max payload size exceeded',
  445. false,
  446. 1009,
  447. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  448. );
  449. cb(error);
  450. return;
  451. }
  452. this._fragments.push(buf);
  453. }
  454. this.dataMessage(cb);
  455. if (this._state === GET_INFO) this.startLoop(cb);
  456. });
  457. }
  458. /**
  459. * Handles a data message.
  460. *
  461. * @param {Function} cb Callback
  462. * @private
  463. */
  464. dataMessage(cb) {
  465. if (!this._fin) {
  466. this._state = GET_INFO;
  467. return;
  468. }
  469. const messageLength = this._messageLength;
  470. const fragments = this._fragments;
  471. this._totalPayloadLength = 0;
  472. this._messageLength = 0;
  473. this._fragmented = 0;
  474. this._fragments = [];
  475. if (this._opcode === 2) {
  476. let data;
  477. if (this._binaryType === 'nodebuffer') {
  478. data = concat(fragments, messageLength);
  479. } else if (this._binaryType === 'arraybuffer') {
  480. data = toArrayBuffer(concat(fragments, messageLength));
  481. } else {
  482. data = fragments;
  483. }
  484. if (this._allowSynchronousEvents) {
  485. this.emit('message', data, true);
  486. this._state = GET_INFO;
  487. } else {
  488. this._state = DEFER_EVENT;
  489. setImmediate(() => {
  490. this.emit('message', data, true);
  491. this._state = GET_INFO;
  492. this.startLoop(cb);
  493. });
  494. }
  495. } else {
  496. const buf = concat(fragments, messageLength);
  497. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  498. const error = this.createError(
  499. Error,
  500. 'invalid UTF-8 sequence',
  501. true,
  502. 1007,
  503. 'WS_ERR_INVALID_UTF8'
  504. );
  505. cb(error);
  506. return;
  507. }
  508. if (this._state === INFLATING || this._allowSynchronousEvents) {
  509. this.emit('message', buf, false);
  510. this._state = GET_INFO;
  511. } else {
  512. this._state = DEFER_EVENT;
  513. setImmediate(() => {
  514. this.emit('message', buf, false);
  515. this._state = GET_INFO;
  516. this.startLoop(cb);
  517. });
  518. }
  519. }
  520. }
  521. /**
  522. * Handles a control message.
  523. *
  524. * @param {Buffer} data Data to handle
  525. * @return {(Error|RangeError|undefined)} A possible error
  526. * @private
  527. */
  528. controlMessage(data, cb) {
  529. if (this._opcode === 0x08) {
  530. if (data.length === 0) {
  531. this._loop = false;
  532. this.emit('conclude', 1005, EMPTY_BUFFER);
  533. this.end();
  534. } else {
  535. const code = data.readUInt16BE(0);
  536. if (!isValidStatusCode(code)) {
  537. const error = this.createError(
  538. RangeError,
  539. `invalid status code ${code}`,
  540. true,
  541. 1002,
  542. 'WS_ERR_INVALID_CLOSE_CODE'
  543. );
  544. cb(error);
  545. return;
  546. }
  547. const buf = new FastBuffer(
  548. data.buffer,
  549. data.byteOffset + 2,
  550. data.length - 2
  551. );
  552. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  553. const error = this.createError(
  554. Error,
  555. 'invalid UTF-8 sequence',
  556. true,
  557. 1007,
  558. 'WS_ERR_INVALID_UTF8'
  559. );
  560. cb(error);
  561. return;
  562. }
  563. this._loop = false;
  564. this.emit('conclude', code, buf);
  565. this.end();
  566. }
  567. this._state = GET_INFO;
  568. return;
  569. }
  570. if (this._allowSynchronousEvents) {
  571. this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
  572. this._state = GET_INFO;
  573. } else {
  574. this._state = DEFER_EVENT;
  575. setImmediate(() => {
  576. this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
  577. this._state = GET_INFO;
  578. this.startLoop(cb);
  579. });
  580. }
  581. }
  582. /**
  583. * Builds an error object.
  584. *
  585. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  586. * @param {String} message The error message
  587. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  588. * `message`
  589. * @param {Number} statusCode The status code
  590. * @param {String} errorCode The exposed error code
  591. * @return {(Error|RangeError)} The error
  592. * @private
  593. */
  594. createError(ErrorCtor, message, prefix, statusCode, errorCode) {
  595. this._loop = false;
  596. this._errored = true;
  597. const err = new ErrorCtor(
  598. prefix ? `Invalid WebSocket frame: ${message}` : message
  599. );
  600. Error.captureStackTrace(err, this.createError);
  601. err.code = errorCode;
  602. err[kStatusCode] = statusCode;
  603. return err;
  604. }
  605. }
  606. module.exports = Receiver;