|
- "use strict";
- var __rest = (this && this.__rest) || function (s, e) {
- var t = {};
- for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0)
- t[p] = s[p];
- if (s != null && typeof Object.getOwnPropertySymbols === "function")
- for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
- if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i]))
- t[p[i]] = s[p[i]];
- }
- return t;
- };
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.ClusterAdapterWithHeartbeat = exports.ClusterAdapter = exports.MessageType = void 0;
- const in_memory_adapter_1 = require("./in-memory-adapter");
- const debug_1 = require("debug");
- const crypto_1 = require("crypto");
- const debug = (0, debug_1.debug)("socket.io-adapter");
- const EMITTER_UID = "emitter";
- const DEFAULT_TIMEOUT = 5000;
- function randomId() {
- return (0, crypto_1.randomBytes)(8).toString("hex");
- }
- var MessageType;
- (function (MessageType) {
- MessageType[MessageType["INITIAL_HEARTBEAT"] = 1] = "INITIAL_HEARTBEAT";
- MessageType[MessageType["HEARTBEAT"] = 2] = "HEARTBEAT";
- MessageType[MessageType["BROADCAST"] = 3] = "BROADCAST";
- MessageType[MessageType["SOCKETS_JOIN"] = 4] = "SOCKETS_JOIN";
- MessageType[MessageType["SOCKETS_LEAVE"] = 5] = "SOCKETS_LEAVE";
- MessageType[MessageType["DISCONNECT_SOCKETS"] = 6] = "DISCONNECT_SOCKETS";
- MessageType[MessageType["FETCH_SOCKETS"] = 7] = "FETCH_SOCKETS";
- MessageType[MessageType["FETCH_SOCKETS_RESPONSE"] = 8] = "FETCH_SOCKETS_RESPONSE";
- MessageType[MessageType["SERVER_SIDE_EMIT"] = 9] = "SERVER_SIDE_EMIT";
- MessageType[MessageType["SERVER_SIDE_EMIT_RESPONSE"] = 10] = "SERVER_SIDE_EMIT_RESPONSE";
- MessageType[MessageType["BROADCAST_CLIENT_COUNT"] = 11] = "BROADCAST_CLIENT_COUNT";
- MessageType[MessageType["BROADCAST_ACK"] = 12] = "BROADCAST_ACK";
- MessageType[MessageType["ADAPTER_CLOSE"] = 13] = "ADAPTER_CLOSE";
- })(MessageType = exports.MessageType || (exports.MessageType = {}));
- function encodeOptions(opts) {
- return {
- rooms: [...opts.rooms],
- except: [...opts.except],
- flags: opts.flags,
- };
- }
- function decodeOptions(opts) {
- return {
- rooms: new Set(opts.rooms),
- except: new Set(opts.except),
- flags: opts.flags,
- };
- }
- /**
- * A cluster-ready adapter. Any extending class must:
- *
- * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
- * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
- */
- class ClusterAdapter extends in_memory_adapter_1.Adapter {
- constructor(nsp) {
- super(nsp);
- this.requests = new Map();
- this.ackRequests = new Map();
- this.uid = randomId();
- }
- /**
- * Called when receiving a message from another member of the cluster.
- *
- * @param message
- * @param offset
- * @protected
- */
- onMessage(message, offset) {
- if (message.uid === this.uid) {
- return debug("[%s] ignore message from self", this.uid);
- }
- debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid);
- switch (message.type) {
- case MessageType.BROADCAST: {
- const withAck = message.data.requestId !== undefined;
- if (withAck) {
- super.broadcastWithAck(message.data.packet, decodeOptions(message.data.opts), (clientCount) => {
- debug("[%s] waiting for %d client acknowledgements", this.uid, clientCount);
- this.publishResponse(message.uid, {
- type: MessageType.BROADCAST_CLIENT_COUNT,
- data: {
- requestId: message.data.requestId,
- clientCount,
- },
- });
- }, (arg) => {
- debug("[%s] received acknowledgement with value %j", this.uid, arg);
- this.publishResponse(message.uid, {
- type: MessageType.BROADCAST_ACK,
- data: {
- requestId: message.data.requestId,
- packet: arg,
- },
- });
- });
- }
- else {
- const packet = message.data.packet;
- const opts = decodeOptions(message.data.opts);
- this.addOffsetIfNecessary(packet, opts, offset);
- super.broadcast(packet, opts);
- }
- break;
- }
- case MessageType.SOCKETS_JOIN:
- super.addSockets(decodeOptions(message.data.opts), message.data.rooms);
- break;
- case MessageType.SOCKETS_LEAVE:
- super.delSockets(decodeOptions(message.data.opts), message.data.rooms);
- break;
- case MessageType.DISCONNECT_SOCKETS:
- super.disconnectSockets(decodeOptions(message.data.opts), message.data.close);
- break;
- case MessageType.FETCH_SOCKETS: {
- debug("[%s] calling fetchSockets with opts %j", this.uid, message.data.opts);
- super
- .fetchSockets(decodeOptions(message.data.opts))
- .then((localSockets) => {
- this.publishResponse(message.uid, {
- type: MessageType.FETCH_SOCKETS_RESPONSE,
- data: {
- requestId: message.data.requestId,
- sockets: localSockets.map((socket) => {
- // remove sessionStore from handshake, as it may contain circular references
- const _a = socket.handshake, { sessionStore } = _a, handshake = __rest(_a, ["sessionStore"]);
- return {
- id: socket.id,
- handshake,
- rooms: [...socket.rooms],
- data: socket.data,
- };
- }),
- },
- });
- });
- break;
- }
- case MessageType.SERVER_SIDE_EMIT: {
- const packet = message.data.packet;
- const withAck = message.data.requestId !== undefined;
- if (!withAck) {
- this.nsp._onServerSideEmit(packet);
- return;
- }
- let called = false;
- const callback = (arg) => {
- // only one argument is expected
- if (called) {
- return;
- }
- called = true;
- debug("[%s] calling acknowledgement with %j", this.uid, arg);
- this.publishResponse(message.uid, {
- type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
- data: {
- requestId: message.data.requestId,
- packet: arg,
- },
- });
- };
- this.nsp._onServerSideEmit([...packet, callback]);
- break;
- }
- // @ts-ignore
- case MessageType.BROADCAST_CLIENT_COUNT:
- // @ts-ignore
- case MessageType.BROADCAST_ACK:
- // @ts-ignore
- case MessageType.FETCH_SOCKETS_RESPONSE:
- // @ts-ignore
- case MessageType.SERVER_SIDE_EMIT_RESPONSE:
- // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may
- // always call the onMessage() method
- this.onResponse(message);
- break;
- default:
- debug("[%s] unknown message type: %s", this.uid, message.type);
- }
- }
- /**
- * Called when receiving a response from another member of the cluster.
- *
- * @param response
- * @protected
- */
- onResponse(response) {
- var _a, _b;
- const requestId = response.data.requestId;
- debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
- switch (response.type) {
- case MessageType.BROADCAST_CLIENT_COUNT: {
- (_a = this.ackRequests
- .get(requestId)) === null || _a === void 0 ? void 0 : _a.clientCountCallback(response.data.clientCount);
- break;
- }
- case MessageType.BROADCAST_ACK: {
- (_b = this.ackRequests.get(requestId)) === null || _b === void 0 ? void 0 : _b.ack(response.data.packet);
- break;
- }
- case MessageType.FETCH_SOCKETS_RESPONSE: {
- const request = this.requests.get(requestId);
- if (!request) {
- return;
- }
- request.current++;
- response.data.sockets.forEach((socket) => request.responses.push(socket));
- if (request.current === request.expected) {
- clearTimeout(request.timeout);
- request.resolve(request.responses);
- this.requests.delete(requestId);
- }
- break;
- }
- case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
- const request = this.requests.get(requestId);
- if (!request) {
- return;
- }
- request.current++;
- request.responses.push(response.data.packet);
- if (request.current === request.expected) {
- clearTimeout(request.timeout);
- request.resolve(null, request.responses);
- this.requests.delete(requestId);
- }
- break;
- }
- default:
- // @ts-ignore
- debug("[%s] unknown response type: %s", this.uid, response.type);
- }
- }
- async broadcast(packet, opts) {
- var _a;
- const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
- if (!onlyLocal) {
- try {
- const offset = await this.publishAndReturnOffset({
- type: MessageType.BROADCAST,
- data: {
- packet,
- opts: encodeOptions(opts),
- },
- });
- this.addOffsetIfNecessary(packet, opts, offset);
- }
- catch (e) {
- return debug("[%s] error while broadcasting message: %s", this.uid, e.message);
- }
- }
- super.broadcast(packet, opts);
- }
- /**
- * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it
- * reconnects after a temporary disconnection.
- *
- * @param packet
- * @param opts
- * @param offset
- * @private
- */
- addOffsetIfNecessary(packet, opts, offset) {
- var _a;
- if (!this.nsp.server.opts.connectionStateRecovery) {
- return;
- }
- const isEventPacket = packet.type === 2;
- // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
- // restored on another server upon reconnection
- const withoutAcknowledgement = packet.id === undefined;
- const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined;
- if (isEventPacket && withoutAcknowledgement && notVolatile) {
- packet.data.push(offset);
- }
- }
- broadcastWithAck(packet, opts, clientCountCallback, ack) {
- var _a;
- const onlyLocal = (_a = opts === null || opts === void 0 ? void 0 : opts.flags) === null || _a === void 0 ? void 0 : _a.local;
- if (!onlyLocal) {
- const requestId = randomId();
- this.ackRequests.set(requestId, {
- clientCountCallback,
- ack,
- });
- this.publish({
- type: MessageType.BROADCAST,
- data: {
- packet,
- requestId,
- opts: encodeOptions(opts),
- },
- });
- // we have no way to know at this level whether the server has received an acknowledgement from each client, so we
- // will simply clean up the ackRequests map after the given delay
- setTimeout(() => {
- this.ackRequests.delete(requestId);
- }, opts.flags.timeout);
- }
- super.broadcastWithAck(packet, opts, clientCountCallback, ack);
- }
- async addSockets(opts, rooms) {
- var _a;
- const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
- if (!onlyLocal) {
- try {
- await this.publishAndReturnOffset({
- type: MessageType.SOCKETS_JOIN,
- data: {
- opts: encodeOptions(opts),
- rooms,
- },
- });
- }
- catch (e) {
- debug("[%s] error while publishing message: %s", this.uid, e.message);
- }
- }
- super.addSockets(opts, rooms);
- }
- async delSockets(opts, rooms) {
- var _a;
- const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
- if (!onlyLocal) {
- try {
- await this.publishAndReturnOffset({
- type: MessageType.SOCKETS_LEAVE,
- data: {
- opts: encodeOptions(opts),
- rooms,
- },
- });
- }
- catch (e) {
- debug("[%s] error while publishing message: %s", this.uid, e.message);
- }
- }
- super.delSockets(opts, rooms);
- }
- async disconnectSockets(opts, close) {
- var _a;
- const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
- if (!onlyLocal) {
- try {
- await this.publishAndReturnOffset({
- type: MessageType.DISCONNECT_SOCKETS,
- data: {
- opts: encodeOptions(opts),
- close,
- },
- });
- }
- catch (e) {
- debug("[%s] error while publishing message: %s", this.uid, e.message);
- }
- }
- super.disconnectSockets(opts, close);
- }
- async fetchSockets(opts) {
- var _a;
- const [localSockets, serverCount] = await Promise.all([
- super.fetchSockets(opts),
- this.serverCount(),
- ]);
- const expectedResponseCount = serverCount - 1;
- if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
- return localSockets;
- }
- const requestId = randomId();
- return new Promise((resolve, reject) => {
- const timeout = setTimeout(() => {
- const storedRequest = this.requests.get(requestId);
- if (storedRequest) {
- reject(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`));
- this.requests.delete(requestId);
- }
- }, opts.flags.timeout || DEFAULT_TIMEOUT);
- const storedRequest = {
- type: MessageType.FETCH_SOCKETS,
- resolve,
- timeout,
- current: 0,
- expected: expectedResponseCount,
- responses: localSockets,
- };
- this.requests.set(requestId, storedRequest);
- this.publish({
- type: MessageType.FETCH_SOCKETS,
- data: {
- opts: encodeOptions(opts),
- requestId,
- },
- });
- });
- }
- async serverSideEmit(packet) {
- const withAck = typeof packet[packet.length - 1] === "function";
- if (!withAck) {
- return this.publish({
- type: MessageType.SERVER_SIDE_EMIT,
- data: {
- packet,
- },
- });
- }
- const ack = packet.pop();
- const expectedResponseCount = (await this.serverCount()) - 1;
- debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
- if (expectedResponseCount <= 0) {
- return ack(null, []);
- }
- const requestId = randomId();
- const timeout = setTimeout(() => {
- const storedRequest = this.requests.get(requestId);
- if (storedRequest) {
- ack(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`), storedRequest.responses);
- this.requests.delete(requestId);
- }
- }, DEFAULT_TIMEOUT);
- const storedRequest = {
- type: MessageType.SERVER_SIDE_EMIT,
- resolve: ack,
- timeout,
- current: 0,
- expected: expectedResponseCount,
- responses: [],
- };
- this.requests.set(requestId, storedRequest);
- this.publish({
- type: MessageType.SERVER_SIDE_EMIT,
- data: {
- requestId,
- packet,
- },
- });
- }
- publish(message) {
- this.publishAndReturnOffset(message).catch((err) => {
- debug("[%s] error while publishing message: %s", this.uid, err);
- });
- }
- publishAndReturnOffset(message) {
- message.uid = this.uid;
- message.nsp = this.nsp.name;
- return this.doPublish(message);
- }
- publishResponse(requesterUid, response) {
- response.uid = this.uid;
- response.nsp = this.nsp.name;
- this.doPublishResponse(requesterUid, response).catch((err) => {
- debug("[%s] error while publishing response: %s", this.uid, err);
- });
- }
- }
- exports.ClusterAdapter = ClusterAdapter;
- class ClusterAdapterWithHeartbeat extends ClusterAdapter {
- constructor(nsp, opts) {
- super(nsp);
- this.nodesMap = new Map(); // uid => timestamp of last message
- this.customRequests = new Map();
- this._opts = Object.assign({
- heartbeatInterval: 5000,
- heartbeatTimeout: 10000,
- }, opts);
- this.cleanupTimer = setInterval(() => {
- const now = Date.now();
- this.nodesMap.forEach((lastSeen, uid) => {
- const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
- if (nodeSeemsDown) {
- debug("[%s] node %s seems down", this.uid, uid);
- this.removeNode(uid);
- }
- });
- }, 1000);
- }
- init() {
- this.publish({
- type: MessageType.INITIAL_HEARTBEAT,
- });
- }
- scheduleHeartbeat() {
- if (this.heartbeatTimer) {
- this.heartbeatTimer.refresh();
- }
- else {
- this.heartbeatTimer = setTimeout(() => {
- this.publish({
- type: MessageType.HEARTBEAT,
- });
- }, this._opts.heartbeatInterval);
- }
- }
- close() {
- this.publish({
- type: MessageType.ADAPTER_CLOSE,
- });
- clearTimeout(this.heartbeatTimer);
- if (this.cleanupTimer) {
- clearInterval(this.cleanupTimer);
- }
- }
- onMessage(message, offset) {
- if (message.uid === this.uid) {
- return debug("[%s] ignore message from self", this.uid);
- }
- if (message.uid && message.uid !== EMITTER_UID) {
- // we track the UID of each sender, in order to know how many servers there are in the cluster
- this.nodesMap.set(message.uid, Date.now());
- }
- debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid);
- switch (message.type) {
- case MessageType.INITIAL_HEARTBEAT:
- this.publish({
- type: MessageType.HEARTBEAT,
- });
- break;
- case MessageType.HEARTBEAT:
- // nothing to do
- break;
- case MessageType.ADAPTER_CLOSE:
- this.removeNode(message.uid);
- break;
- default:
- super.onMessage(message, offset);
- }
- }
- serverCount() {
- return Promise.resolve(1 + this.nodesMap.size);
- }
- publish(message) {
- this.scheduleHeartbeat();
- return super.publish(message);
- }
- async serverSideEmit(packet) {
- const withAck = typeof packet[packet.length - 1] === "function";
- if (!withAck) {
- return this.publish({
- type: MessageType.SERVER_SIDE_EMIT,
- data: {
- packet,
- },
- });
- }
- const ack = packet.pop();
- const expectedResponseCount = this.nodesMap.size;
- debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
- if (expectedResponseCount <= 0) {
- return ack(null, []);
- }
- const requestId = randomId();
- const timeout = setTimeout(() => {
- const storedRequest = this.customRequests.get(requestId);
- if (storedRequest) {
- ack(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`), storedRequest.responses);
- this.customRequests.delete(requestId);
- }
- }, DEFAULT_TIMEOUT);
- const storedRequest = {
- type: MessageType.SERVER_SIDE_EMIT,
- resolve: ack,
- timeout,
- missingUids: new Set([...this.nodesMap.keys()]),
- responses: [],
- };
- this.customRequests.set(requestId, storedRequest);
- this.publish({
- type: MessageType.SERVER_SIDE_EMIT,
- data: {
- requestId,
- packet,
- },
- });
- }
- async fetchSockets(opts) {
- var _a;
- const [localSockets, serverCount] = await Promise.all([
- super.fetchSockets({
- rooms: opts.rooms,
- except: opts.except,
- flags: {
- local: true,
- },
- }),
- this.serverCount(),
- ]);
- const expectedResponseCount = serverCount - 1;
- if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
- return localSockets;
- }
- const requestId = randomId();
- return new Promise((resolve, reject) => {
- const timeout = setTimeout(() => {
- const storedRequest = this.customRequests.get(requestId);
- if (storedRequest) {
- reject(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`));
- this.customRequests.delete(requestId);
- }
- }, opts.flags.timeout || DEFAULT_TIMEOUT);
- const storedRequest = {
- type: MessageType.FETCH_SOCKETS,
- resolve,
- timeout,
- missingUids: new Set([...this.nodesMap.keys()]),
- responses: localSockets,
- };
- this.customRequests.set(requestId, storedRequest);
- this.publish({
- type: MessageType.FETCH_SOCKETS,
- data: {
- opts: encodeOptions(opts),
- requestId,
- },
- });
- });
- }
- onResponse(response) {
- const requestId = response.data.requestId;
- debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
- switch (response.type) {
- case MessageType.FETCH_SOCKETS_RESPONSE: {
- const request = this.customRequests.get(requestId);
- if (!request) {
- return;
- }
- response.data.sockets.forEach((socket) => request.responses.push(socket));
- request.missingUids.delete(response.uid);
- if (request.missingUids.size === 0) {
- clearTimeout(request.timeout);
- request.resolve(request.responses);
- this.customRequests.delete(requestId);
- }
- break;
- }
- case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
- const request = this.customRequests.get(requestId);
- if (!request) {
- return;
- }
- request.responses.push(response.data.packet);
- request.missingUids.delete(response.uid);
- if (request.missingUids.size === 0) {
- clearTimeout(request.timeout);
- request.resolve(null, request.responses);
- this.customRequests.delete(requestId);
- }
- break;
- }
- default:
- super.onResponse(response);
- }
- }
- removeNode(uid) {
- this.customRequests.forEach((request, requestId) => {
- request.missingUids.delete(uid);
- if (request.missingUids.size === 0) {
- clearTimeout(request.timeout);
- if (request.type === MessageType.FETCH_SOCKETS) {
- request.resolve(request.responses);
- }
- else if (request.type === MessageType.SERVER_SIDE_EMIT) {
- request.resolve(null, request.responses);
- }
- this.customRequests.delete(requestId);
- }
- });
- this.nodesMap.delete(uid);
- }
- }
- exports.ClusterAdapterWithHeartbeat = ClusterAdapterWithHeartbeat;
|