in-memory-adapter.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. "use strict";
  2. var _a;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.SessionAwareAdapter = exports.Adapter = void 0;
  5. const events_1 = require("events");
  6. const yeast_1 = require("./contrib/yeast");
  7. const WebSocket = require("ws");
  8. // @ts-expect-error
  9. const canPreComputeFrame = typeof ((_a = WebSocket === null || WebSocket === void 0 ? void 0 : WebSocket.Sender) === null || _a === void 0 ? void 0 : _a.frame) === "function";
  10. class Adapter extends events_1.EventEmitter {
  11. /**
  12. * In-memory adapter constructor.
  13. *
  14. * @param nsp
  15. */
  16. constructor(nsp) {
  17. super();
  18. this.nsp = nsp;
  19. this.rooms = new Map();
  20. this.sids = new Map();
  21. this.encoder = nsp.server.encoder; // nsp is a Namespace object
  22. }
  23. /**
  24. * To be overridden
  25. */
  26. init() { }
  27. /**
  28. * To be overridden
  29. */
  30. close() { }
  31. /**
  32. * Returns the number of Socket.IO servers in the cluster
  33. *
  34. * @public
  35. */
  36. serverCount() {
  37. return Promise.resolve(1);
  38. }
  39. /**
  40. * Adds a socket to a list of room.
  41. *
  42. * @param {SocketId} id the socket id
  43. * @param {Set<Room>} rooms a set of rooms
  44. * @public
  45. */
  46. addAll(id, rooms) {
  47. if (!this.sids.has(id)) {
  48. this.sids.set(id, new Set());
  49. }
  50. for (const room of rooms) {
  51. this.sids.get(id).add(room);
  52. if (!this.rooms.has(room)) {
  53. this.rooms.set(room, new Set());
  54. this.emit("create-room", room);
  55. }
  56. if (!this.rooms.get(room).has(id)) {
  57. this.rooms.get(room).add(id);
  58. this.emit("join-room", room, id);
  59. }
  60. }
  61. }
  62. /**
  63. * Removes a socket from a room.
  64. *
  65. * @param {SocketId} id the socket id
  66. * @param {Room} room the room name
  67. */
  68. del(id, room) {
  69. if (this.sids.has(id)) {
  70. this.sids.get(id).delete(room);
  71. }
  72. this._del(room, id);
  73. }
  74. _del(room, id) {
  75. const _room = this.rooms.get(room);
  76. if (_room != null) {
  77. const deleted = _room.delete(id);
  78. if (deleted) {
  79. this.emit("leave-room", room, id);
  80. }
  81. if (_room.size === 0 && this.rooms.delete(room)) {
  82. this.emit("delete-room", room);
  83. }
  84. }
  85. }
  86. /**
  87. * Removes a socket from all rooms it's joined.
  88. *
  89. * @param {SocketId} id the socket id
  90. */
  91. delAll(id) {
  92. if (!this.sids.has(id)) {
  93. return;
  94. }
  95. for (const room of this.sids.get(id)) {
  96. this._del(room, id);
  97. }
  98. this.sids.delete(id);
  99. }
  100. /**
  101. * Broadcasts a packet.
  102. *
  103. * Options:
  104. * - `flags` {Object} flags for this packet
  105. * - `except` {Array} sids that should be excluded
  106. * - `rooms` {Array} list of rooms to broadcast to
  107. *
  108. * @param {Object} packet the packet object
  109. * @param {Object} opts the options
  110. * @public
  111. */
  112. broadcast(packet, opts) {
  113. const flags = opts.flags || {};
  114. const packetOpts = {
  115. preEncoded: true,
  116. volatile: flags.volatile,
  117. compress: flags.compress,
  118. };
  119. packet.nsp = this.nsp.name;
  120. const encodedPackets = this._encode(packet, packetOpts);
  121. this.apply(opts, (socket) => {
  122. if (typeof socket.notifyOutgoingListeners === "function") {
  123. socket.notifyOutgoingListeners(packet);
  124. }
  125. socket.client.writeToEngine(encodedPackets, packetOpts);
  126. });
  127. }
  128. /**
  129. * Broadcasts a packet and expects multiple acknowledgements.
  130. *
  131. * Options:
  132. * - `flags` {Object} flags for this packet
  133. * - `except` {Array} sids that should be excluded
  134. * - `rooms` {Array} list of rooms to broadcast to
  135. *
  136. * @param {Object} packet the packet object
  137. * @param {Object} opts the options
  138. * @param clientCountCallback - the number of clients that received the packet
  139. * @param ack - the callback that will be called for each client response
  140. *
  141. * @public
  142. */
  143. broadcastWithAck(packet, opts, clientCountCallback, ack) {
  144. const flags = opts.flags || {};
  145. const packetOpts = {
  146. preEncoded: true,
  147. volatile: flags.volatile,
  148. compress: flags.compress,
  149. };
  150. packet.nsp = this.nsp.name;
  151. // we can use the same id for each packet, since the _ids counter is common (no duplicate)
  152. packet.id = this.nsp._ids++;
  153. const encodedPackets = this._encode(packet, packetOpts);
  154. let clientCount = 0;
  155. this.apply(opts, (socket) => {
  156. // track the total number of acknowledgements that are expected
  157. clientCount++;
  158. // call the ack callback for each client response
  159. socket.acks.set(packet.id, ack);
  160. if (typeof socket.notifyOutgoingListeners === "function") {
  161. socket.notifyOutgoingListeners(packet);
  162. }
  163. socket.client.writeToEngine(encodedPackets, packetOpts);
  164. });
  165. clientCountCallback(clientCount);
  166. }
  167. _encode(packet, packetOpts) {
  168. const encodedPackets = this.encoder.encode(packet);
  169. if (canPreComputeFrame &&
  170. encodedPackets.length === 1 &&
  171. typeof encodedPackets[0] === "string") {
  172. // "4" being the "message" packet type in the Engine.IO protocol
  173. const data = Buffer.from("4" + encodedPackets[0]);
  174. // see https://github.com/websockets/ws/issues/617#issuecomment-283002469
  175. // @ts-expect-error
  176. packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, {
  177. readOnly: false,
  178. mask: false,
  179. rsv1: false,
  180. opcode: 1,
  181. fin: true,
  182. });
  183. }
  184. return encodedPackets;
  185. }
  186. /**
  187. * Gets a list of sockets by sid.
  188. *
  189. * @param {Set<Room>} rooms the explicit set of rooms to check.
  190. */
  191. sockets(rooms) {
  192. const sids = new Set();
  193. this.apply({ rooms }, (socket) => {
  194. sids.add(socket.id);
  195. });
  196. return Promise.resolve(sids);
  197. }
  198. /**
  199. * Gets the list of rooms a given socket has joined.
  200. *
  201. * @param {SocketId} id the socket id
  202. */
  203. socketRooms(id) {
  204. return this.sids.get(id);
  205. }
  206. /**
  207. * Returns the matching socket instances
  208. *
  209. * @param opts - the filters to apply
  210. */
  211. fetchSockets(opts) {
  212. const sockets = [];
  213. this.apply(opts, (socket) => {
  214. sockets.push(socket);
  215. });
  216. return Promise.resolve(sockets);
  217. }
  218. /**
  219. * Makes the matching socket instances join the specified rooms
  220. *
  221. * @param opts - the filters to apply
  222. * @param rooms - the rooms to join
  223. */
  224. addSockets(opts, rooms) {
  225. this.apply(opts, (socket) => {
  226. socket.join(rooms);
  227. });
  228. }
  229. /**
  230. * Makes the matching socket instances leave the specified rooms
  231. *
  232. * @param opts - the filters to apply
  233. * @param rooms - the rooms to leave
  234. */
  235. delSockets(opts, rooms) {
  236. this.apply(opts, (socket) => {
  237. rooms.forEach((room) => socket.leave(room));
  238. });
  239. }
  240. /**
  241. * Makes the matching socket instances disconnect
  242. *
  243. * @param opts - the filters to apply
  244. * @param close - whether to close the underlying connection
  245. */
  246. disconnectSockets(opts, close) {
  247. this.apply(opts, (socket) => {
  248. socket.disconnect(close);
  249. });
  250. }
  251. apply(opts, callback) {
  252. const rooms = opts.rooms;
  253. const except = this.computeExceptSids(opts.except);
  254. if (rooms.size) {
  255. const ids = new Set();
  256. for (const room of rooms) {
  257. if (!this.rooms.has(room))
  258. continue;
  259. for (const id of this.rooms.get(room)) {
  260. if (ids.has(id) || except.has(id))
  261. continue;
  262. const socket = this.nsp.sockets.get(id);
  263. if (socket) {
  264. callback(socket);
  265. ids.add(id);
  266. }
  267. }
  268. }
  269. }
  270. else {
  271. for (const [id] of this.sids) {
  272. if (except.has(id))
  273. continue;
  274. const socket = this.nsp.sockets.get(id);
  275. if (socket)
  276. callback(socket);
  277. }
  278. }
  279. }
  280. computeExceptSids(exceptRooms) {
  281. const exceptSids = new Set();
  282. if (exceptRooms && exceptRooms.size > 0) {
  283. for (const room of exceptRooms) {
  284. if (this.rooms.has(room)) {
  285. this.rooms.get(room).forEach((sid) => exceptSids.add(sid));
  286. }
  287. }
  288. }
  289. return exceptSids;
  290. }
  291. /**
  292. * Send a packet to the other Socket.IO servers in the cluster
  293. * @param packet - an array of arguments, which may include an acknowledgement callback at the end
  294. */
  295. serverSideEmit(packet) {
  296. console.warn("this adapter does not support the serverSideEmit() functionality");
  297. }
  298. /**
  299. * Save the client session in order to restore it upon reconnection.
  300. */
  301. persistSession(session) { }
  302. /**
  303. * Restore the session and find the packets that were missed by the client.
  304. * @param pid
  305. * @param offset
  306. */
  307. restoreSession(pid, offset) {
  308. return null;
  309. }
  310. }
  311. exports.Adapter = Adapter;
  312. class SessionAwareAdapter extends Adapter {
  313. constructor(nsp) {
  314. super(nsp);
  315. this.nsp = nsp;
  316. this.sessions = new Map();
  317. this.packets = [];
  318. this.maxDisconnectionDuration =
  319. nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration;
  320. const timer = setInterval(() => {
  321. const threshold = Date.now() - this.maxDisconnectionDuration;
  322. this.sessions.forEach((session, sessionId) => {
  323. const hasExpired = session.disconnectedAt < threshold;
  324. if (hasExpired) {
  325. this.sessions.delete(sessionId);
  326. }
  327. });
  328. for (let i = this.packets.length - 1; i >= 0; i--) {
  329. const hasExpired = this.packets[i].emittedAt < threshold;
  330. if (hasExpired) {
  331. this.packets.splice(0, i + 1);
  332. break;
  333. }
  334. }
  335. }, 60 * 1000);
  336. // prevents the timer from keeping the process alive
  337. timer.unref();
  338. }
  339. persistSession(session) {
  340. session.disconnectedAt = Date.now();
  341. this.sessions.set(session.pid, session);
  342. }
  343. restoreSession(pid, offset) {
  344. const session = this.sessions.get(pid);
  345. if (!session) {
  346. // the session may have expired
  347. return null;
  348. }
  349. const hasExpired = session.disconnectedAt + this.maxDisconnectionDuration < Date.now();
  350. if (hasExpired) {
  351. // the session has expired
  352. this.sessions.delete(pid);
  353. return null;
  354. }
  355. const index = this.packets.findIndex((packet) => packet.id === offset);
  356. if (index === -1) {
  357. // the offset may be too old
  358. return null;
  359. }
  360. const missedPackets = [];
  361. for (let i = index + 1; i < this.packets.length; i++) {
  362. const packet = this.packets[i];
  363. if (shouldIncludePacket(session.rooms, packet.opts)) {
  364. missedPackets.push(packet.data);
  365. }
  366. }
  367. return Promise.resolve(Object.assign(Object.assign({}, session), { missedPackets }));
  368. }
  369. broadcast(packet, opts) {
  370. var _a;
  371. const isEventPacket = packet.type === 2;
  372. // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
  373. // restored on another server upon reconnection
  374. const withoutAcknowledgement = packet.id === undefined;
  375. const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined;
  376. if (isEventPacket && withoutAcknowledgement && notVolatile) {
  377. const id = (0, yeast_1.yeast)();
  378. // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has
  379. // processed (and the format is backward-compatible)
  380. packet.data.push(id);
  381. this.packets.push({
  382. id,
  383. opts,
  384. data: packet.data,
  385. emittedAt: Date.now(),
  386. });
  387. }
  388. super.broadcast(packet, opts);
  389. }
  390. }
  391. exports.SessionAwareAdapter = SessionAwareAdapter;
  392. function shouldIncludePacket(sessionRooms, opts) {
  393. const included = opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room));
  394. const notExcluded = sessionRooms.every((room) => !opts.except.has(room));
  395. return included && notExcluded;
  396. }