123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- "use strict";
- var __importDefault = (this && this.__importDefault) || function (mod) {
- return (mod && mod.__esModule) ? mod : { "default": mod };
- };
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.serveFile = exports.restoreAdapter = exports.patchAdapter = void 0;
- const socket_io_adapter_1 = require("socket.io-adapter");
- const fs_1 = require("fs");
- const debug_1 = __importDefault(require("debug"));
- const debug = (0, debug_1.default)("socket.io:adapter-uws");
- const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text
- const { addAll, del, broadcast } = socket_io_adapter_1.Adapter.prototype;
- function patchAdapter(app /* : TemplatedApp */) {
- socket_io_adapter_1.Adapter.prototype.addAll = function (id, rooms) {
- const isNew = !this.sids.has(id);
- addAll.call(this, id, rooms);
- const socket = this.nsp.sockets.get(id);
- if (!socket) {
- return;
- }
- if (socket.conn.transport.name === "websocket") {
- subscribe(this.nsp.name, socket, isNew, rooms);
- return;
- }
- if (isNew) {
- socket.conn.on("upgrade", () => {
- const rooms = this.sids.get(id);
- if (rooms) {
- subscribe(this.nsp.name, socket, isNew, rooms);
- }
- });
- }
- };
- socket_io_adapter_1.Adapter.prototype.del = function (id, room) {
- del.call(this, id, room);
- const socket = this.nsp.sockets.get(id);
- if (socket && socket.conn.transport.name === "websocket") {
- // @ts-ignore
- const sessionId = socket.conn.id;
- // @ts-ignore
- const websocket = socket.conn.transport.socket;
- const topic = `${this.nsp.name}${SEPARATOR}${room}`;
- debug("unsubscribe connection %s from topic %s", sessionId, topic);
- websocket.unsubscribe(topic);
- }
- };
- socket_io_adapter_1.Adapter.prototype.broadcast = function (packet, opts) {
- const useFastPublish = opts.rooms.size <= 1 && opts.except.size === 0;
- if (!useFastPublish) {
- broadcast.call(this, packet, opts);
- return;
- }
- const flags = opts.flags || {};
- const basePacketOpts = {
- preEncoded: true,
- volatile: flags.volatile,
- compress: flags.compress,
- };
- packet.nsp = this.nsp.name;
- const encodedPackets = this.encoder.encode(packet);
- const topic = opts.rooms.size === 0
- ? this.nsp.name
- : `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`;
- debug("fast publish to %s", topic);
- // fast publish for clients connected with WebSocket
- encodedPackets.forEach((encodedPacket) => {
- const isBinary = typeof encodedPacket !== "string";
- // "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol
- app.publish(topic, isBinary ? encodedPacket : "4" + encodedPacket, isBinary);
- });
- this.apply(opts, (socket) => {
- if (socket.conn.transport.name !== "websocket") {
- // classic publish for clients connected with HTTP long-polling
- socket.client.writeToEngine(encodedPackets, basePacketOpts);
- }
- });
- };
- }
- exports.patchAdapter = patchAdapter;
- function subscribe(namespaceName, socket, isNew, rooms) {
- // @ts-ignore
- const sessionId = socket.conn.id;
- // @ts-ignore
- const websocket = socket.conn.transport.socket;
- if (isNew) {
- debug("subscribe connection %s to topic %s", sessionId, namespaceName);
- websocket.subscribe(namespaceName);
- }
- rooms.forEach((room) => {
- const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard
- debug("subscribe connection %s to topic %s", sessionId, topic);
- websocket.subscribe(topic);
- });
- }
- function restoreAdapter() {
- socket_io_adapter_1.Adapter.prototype.addAll = addAll;
- socket_io_adapter_1.Adapter.prototype.del = del;
- socket_io_adapter_1.Adapter.prototype.broadcast = broadcast;
- }
- exports.restoreAdapter = restoreAdapter;
- const toArrayBuffer = (buffer) => {
- const { buffer: arrayBuffer, byteOffset, byteLength } = buffer;
- return arrayBuffer.slice(byteOffset, byteOffset + byteLength);
- };
- // imported from https://github.com/kolodziejczak-sz/uwebsocket-serve
- function serveFile(res /* : HttpResponse */, filepath) {
- const { size } = (0, fs_1.statSync)(filepath);
- const readStream = (0, fs_1.createReadStream)(filepath);
- const destroyReadStream = () => !readStream.destroyed && readStream.destroy();
- const onError = (error) => {
- destroyReadStream();
- throw error;
- };
- const onDataChunk = (chunk) => {
- const arrayBufferChunk = toArrayBuffer(chunk);
- const lastOffset = res.getWriteOffset();
- const [ok, done] = res.tryEnd(arrayBufferChunk, size);
- if (!done && !ok) {
- readStream.pause();
- res.onWritable((offset) => {
- const [ok, done] = res.tryEnd(arrayBufferChunk.slice(offset - lastOffset), size);
- if (!done && ok) {
- readStream.resume();
- }
- return ok;
- });
- }
- };
- res.onAborted(destroyReadStream);
- readStream
- .on("data", onDataChunk)
- .on("error", onError)
- .on("end", destroyReadStream);
- }
- exports.serveFile = serveFile;
|