broadcast-operator.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RemoteSocket = exports.BroadcastOperator = void 0;
  4. const socket_1 = require("./socket");
  5. const socket_io_parser_1 = require("socket.io-parser");
  6. class BroadcastOperator {
  7. constructor(adapter, rooms = new Set(), exceptRooms = new Set(), flags = {}) {
  8. this.adapter = adapter;
  9. this.rooms = rooms;
  10. this.exceptRooms = exceptRooms;
  11. this.flags = flags;
  12. }
  13. /**
  14. * Targets a room when emitting.
  15. *
  16. * @example
  17. * // the “foo” event will be broadcast to all connected clients in the “room-101” room
  18. * io.to("room-101").emit("foo", "bar");
  19. *
  20. * // with an array of rooms (a client will be notified at most once)
  21. * io.to(["room-101", "room-102"]).emit("foo", "bar");
  22. *
  23. * // with multiple chained calls
  24. * io.to("room-101").to("room-102").emit("foo", "bar");
  25. *
  26. * @param room - a room, or an array of rooms
  27. * @return a new {@link BroadcastOperator} instance for chaining
  28. */
  29. to(room) {
  30. const rooms = new Set(this.rooms);
  31. if (Array.isArray(room)) {
  32. room.forEach((r) => rooms.add(r));
  33. }
  34. else {
  35. rooms.add(room);
  36. }
  37. return new BroadcastOperator(this.adapter, rooms, this.exceptRooms, this.flags);
  38. }
  39. /**
  40. * Targets a room when emitting. Similar to `to()`, but might feel clearer in some cases:
  41. *
  42. * @example
  43. * // disconnect all clients in the "room-101" room
  44. * io.in("room-101").disconnectSockets();
  45. *
  46. * @param room - a room, or an array of rooms
  47. * @return a new {@link BroadcastOperator} instance for chaining
  48. */
  49. in(room) {
  50. return this.to(room);
  51. }
  52. /**
  53. * Excludes a room when emitting.
  54. *
  55. * @example
  56. * // the "foo" event will be broadcast to all connected clients, except the ones that are in the "room-101" room
  57. * io.except("room-101").emit("foo", "bar");
  58. *
  59. * // with an array of rooms
  60. * io.except(["room-101", "room-102"]).emit("foo", "bar");
  61. *
  62. * // with multiple chained calls
  63. * io.except("room-101").except("room-102").emit("foo", "bar");
  64. *
  65. * @param room - a room, or an array of rooms
  66. * @return a new {@link BroadcastOperator} instance for chaining
  67. */
  68. except(room) {
  69. const exceptRooms = new Set(this.exceptRooms);
  70. if (Array.isArray(room)) {
  71. room.forEach((r) => exceptRooms.add(r));
  72. }
  73. else {
  74. exceptRooms.add(room);
  75. }
  76. return new BroadcastOperator(this.adapter, this.rooms, exceptRooms, this.flags);
  77. }
  78. /**
  79. * Sets the compress flag.
  80. *
  81. * @example
  82. * io.compress(false).emit("hello");
  83. *
  84. * @param compress - if `true`, compresses the sending data
  85. * @return a new BroadcastOperator instance
  86. */
  87. compress(compress) {
  88. const flags = Object.assign({}, this.flags, { compress });
  89. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  90. }
  91. /**
  92. * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
  93. * receive messages (because of network slowness or other issues, or because they’re connected through long polling
  94. * and is in the middle of a request-response cycle).
  95. *
  96. * @example
  97. * io.volatile.emit("hello"); // the clients may or may not receive it
  98. *
  99. * @return a new BroadcastOperator instance
  100. */
  101. get volatile() {
  102. const flags = Object.assign({}, this.flags, { volatile: true });
  103. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  104. }
  105. /**
  106. * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node.
  107. *
  108. * @example
  109. * // the “foo” event will be broadcast to all connected clients on this node
  110. * io.local.emit("foo", "bar");
  111. *
  112. * @return a new {@link BroadcastOperator} instance for chaining
  113. */
  114. get local() {
  115. const flags = Object.assign({}, this.flags, { local: true });
  116. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  117. }
  118. /**
  119. * Adds a timeout in milliseconds for the next operation
  120. *
  121. * @example
  122. * io.timeout(1000).emit("some-event", (err, responses) => {
  123. * if (err) {
  124. * // some clients did not acknowledge the event in the given delay
  125. * } else {
  126. * console.log(responses); // one response per client
  127. * }
  128. * });
  129. *
  130. * @param timeout
  131. */
  132. timeout(timeout) {
  133. const flags = Object.assign({}, this.flags, { timeout });
  134. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  135. }
  136. /**
  137. * Emits to all clients.
  138. *
  139. * @example
  140. * // the “foo” event will be broadcast to all connected clients
  141. * io.emit("foo", "bar");
  142. *
  143. * // the “foo” event will be broadcast to all connected clients in the “room-101” room
  144. * io.to("room-101").emit("foo", "bar");
  145. *
  146. * // with an acknowledgement expected from all connected clients
  147. * io.timeout(1000).emit("some-event", (err, responses) => {
  148. * if (err) {
  149. * // some clients did not acknowledge the event in the given delay
  150. * } else {
  151. * console.log(responses); // one response per client
  152. * }
  153. * });
  154. *
  155. * @return Always true
  156. */
  157. emit(ev, ...args) {
  158. if (socket_1.RESERVED_EVENTS.has(ev)) {
  159. throw new Error(`"${String(ev)}" is a reserved event name`);
  160. }
  161. // set up packet object
  162. const data = [ev, ...args];
  163. const packet = {
  164. type: socket_io_parser_1.PacketType.EVENT,
  165. data: data,
  166. };
  167. const withAck = typeof data[data.length - 1] === "function";
  168. if (!withAck) {
  169. this.adapter.broadcast(packet, {
  170. rooms: this.rooms,
  171. except: this.exceptRooms,
  172. flags: this.flags,
  173. });
  174. return true;
  175. }
  176. const ack = data.pop();
  177. let timedOut = false;
  178. let responses = [];
  179. const timer = setTimeout(() => {
  180. timedOut = true;
  181. ack.apply(this, [
  182. new Error("operation has timed out"),
  183. this.flags.expectSingleResponse ? null : responses,
  184. ]);
  185. }, this.flags.timeout);
  186. let expectedServerCount = -1;
  187. let actualServerCount = 0;
  188. let expectedClientCount = 0;
  189. const checkCompleteness = () => {
  190. if (!timedOut &&
  191. expectedServerCount === actualServerCount &&
  192. responses.length === expectedClientCount) {
  193. clearTimeout(timer);
  194. ack.apply(this, [
  195. null,
  196. this.flags.expectSingleResponse ? responses[0] : responses,
  197. ]);
  198. }
  199. };
  200. this.adapter.broadcastWithAck(packet, {
  201. rooms: this.rooms,
  202. except: this.exceptRooms,
  203. flags: this.flags,
  204. }, (clientCount) => {
  205. // each Socket.IO server in the cluster sends the number of clients that were notified
  206. expectedClientCount += clientCount;
  207. actualServerCount++;
  208. checkCompleteness();
  209. }, (clientResponse) => {
  210. // each client sends an acknowledgement
  211. responses.push(clientResponse);
  212. checkCompleteness();
  213. });
  214. this.adapter.serverCount().then((serverCount) => {
  215. expectedServerCount = serverCount;
  216. checkCompleteness();
  217. });
  218. return true;
  219. }
  220. /**
  221. * Emits an event and waits for an acknowledgement from all clients.
  222. *
  223. * @example
  224. * try {
  225. * const responses = await io.timeout(1000).emitWithAck("some-event");
  226. * console.log(responses); // one response per client
  227. * } catch (e) {
  228. * // some clients did not acknowledge the event in the given delay
  229. * }
  230. *
  231. * @return a Promise that will be fulfilled when all clients have acknowledged the event
  232. */
  233. emitWithAck(ev, ...args) {
  234. return new Promise((resolve, reject) => {
  235. args.push((err, responses) => {
  236. if (err) {
  237. err.responses = responses;
  238. return reject(err);
  239. }
  240. else {
  241. return resolve(responses);
  242. }
  243. });
  244. this.emit(ev, ...args);
  245. });
  246. }
  247. /**
  248. * Gets a list of clients.
  249. *
  250. * @deprecated this method will be removed in the next major release, please use {@link Server#serverSideEmit} or
  251. * {@link fetchSockets} instead.
  252. */
  253. allSockets() {
  254. if (!this.adapter) {
  255. throw new Error("No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?");
  256. }
  257. return this.adapter.sockets(this.rooms);
  258. }
  259. /**
  260. * Returns the matching socket instances. This method works across a cluster of several Socket.IO servers.
  261. *
  262. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  263. *
  264. * @example
  265. * // return all Socket instances
  266. * const sockets = await io.fetchSockets();
  267. *
  268. * // return all Socket instances in the "room1" room
  269. * const sockets = await io.in("room1").fetchSockets();
  270. *
  271. * for (const socket of sockets) {
  272. * console.log(socket.id);
  273. * console.log(socket.handshake);
  274. * console.log(socket.rooms);
  275. * console.log(socket.data);
  276. *
  277. * socket.emit("hello");
  278. * socket.join("room1");
  279. * socket.leave("room2");
  280. * socket.disconnect();
  281. * }
  282. */
  283. fetchSockets() {
  284. return this.adapter
  285. .fetchSockets({
  286. rooms: this.rooms,
  287. except: this.exceptRooms,
  288. flags: this.flags,
  289. })
  290. .then((sockets) => {
  291. return sockets.map((socket) => {
  292. if (socket instanceof socket_1.Socket) {
  293. // FIXME the TypeScript compiler complains about missing private properties
  294. return socket;
  295. }
  296. else {
  297. return new RemoteSocket(this.adapter, socket);
  298. }
  299. });
  300. });
  301. }
  302. /**
  303. * Makes the matching socket instances join the specified rooms.
  304. *
  305. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  306. *
  307. * @example
  308. *
  309. * // make all socket instances join the "room1" room
  310. * io.socketsJoin("room1");
  311. *
  312. * // make all socket instances in the "room1" room join the "room2" and "room3" rooms
  313. * io.in("room1").socketsJoin(["room2", "room3"]);
  314. *
  315. * @param room - a room, or an array of rooms
  316. */
  317. socketsJoin(room) {
  318. this.adapter.addSockets({
  319. rooms: this.rooms,
  320. except: this.exceptRooms,
  321. flags: this.flags,
  322. }, Array.isArray(room) ? room : [room]);
  323. }
  324. /**
  325. * Makes the matching socket instances leave the specified rooms.
  326. *
  327. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  328. *
  329. * @example
  330. * // make all socket instances leave the "room1" room
  331. * io.socketsLeave("room1");
  332. *
  333. * // make all socket instances in the "room1" room leave the "room2" and "room3" rooms
  334. * io.in("room1").socketsLeave(["room2", "room3"]);
  335. *
  336. * @param room - a room, or an array of rooms
  337. */
  338. socketsLeave(room) {
  339. this.adapter.delSockets({
  340. rooms: this.rooms,
  341. except: this.exceptRooms,
  342. flags: this.flags,
  343. }, Array.isArray(room) ? room : [room]);
  344. }
  345. /**
  346. * Makes the matching socket instances disconnect.
  347. *
  348. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  349. *
  350. * @example
  351. * // make all socket instances disconnect (the connections might be kept alive for other namespaces)
  352. * io.disconnectSockets();
  353. *
  354. * // make all socket instances in the "room1" room disconnect and close the underlying connections
  355. * io.in("room1").disconnectSockets(true);
  356. *
  357. * @param close - whether to close the underlying connection
  358. */
  359. disconnectSockets(close = false) {
  360. this.adapter.disconnectSockets({
  361. rooms: this.rooms,
  362. except: this.exceptRooms,
  363. flags: this.flags,
  364. }, close);
  365. }
  366. }
  367. exports.BroadcastOperator = BroadcastOperator;
  368. /**
  369. * Expose of subset of the attributes and methods of the Socket class
  370. */
  371. class RemoteSocket {
  372. constructor(adapter, details) {
  373. this.id = details.id;
  374. this.handshake = details.handshake;
  375. this.rooms = new Set(details.rooms);
  376. this.data = details.data;
  377. this.operator = new BroadcastOperator(adapter, new Set([this.id]), new Set(), {
  378. expectSingleResponse: true, // so that remoteSocket.emit() with acknowledgement behaves like socket.emit()
  379. });
  380. }
  381. /**
  382. * Adds a timeout in milliseconds for the next operation.
  383. *
  384. * @example
  385. * const sockets = await io.fetchSockets();
  386. *
  387. * for (const socket of sockets) {
  388. * if (someCondition) {
  389. * socket.timeout(1000).emit("some-event", (err) => {
  390. * if (err) {
  391. * // the client did not acknowledge the event in the given delay
  392. * }
  393. * });
  394. * }
  395. * }
  396. *
  397. * // note: if possible, using a room instead of looping over all sockets is preferable
  398. * io.timeout(1000).to(someConditionRoom).emit("some-event", (err, responses) => {
  399. * // ...
  400. * });
  401. *
  402. * @param timeout
  403. */
  404. timeout(timeout) {
  405. return this.operator.timeout(timeout);
  406. }
  407. emit(ev, ...args) {
  408. return this.operator.emit(ev, ...args);
  409. }
  410. /**
  411. * Joins a room.
  412. *
  413. * @param {String|Array} room - room or array of rooms
  414. */
  415. join(room) {
  416. return this.operator.socketsJoin(room);
  417. }
  418. /**
  419. * Leaves a room.
  420. *
  421. * @param {String} room
  422. */
  423. leave(room) {
  424. return this.operator.socketsLeave(room);
  425. }
  426. /**
  427. * Disconnects this client.
  428. *
  429. * @param {Boolean} close - if `true`, closes the underlying connection
  430. * @return {Socket} self
  431. */
  432. disconnect(close = false) {
  433. this.operator.disconnectSockets(close);
  434. return this;
  435. }
  436. }
  437. exports.RemoteSocket = RemoteSocket;