cluster-adapter.js 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. "use strict";
  2. var __rest = (this && this.__rest) || function (s, e) {
  3. var t = {};
  4. for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0)
  5. t[p] = s[p];
  6. if (s != null && typeof Object.getOwnPropertySymbols === "function")
  7. for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
  8. if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i]))
  9. t[p[i]] = s[p[i]];
  10. }
  11. return t;
  12. };
  13. Object.defineProperty(exports, "__esModule", { value: true });
  14. exports.ClusterAdapterWithHeartbeat = exports.ClusterAdapter = exports.MessageType = void 0;
  15. const in_memory_adapter_1 = require("./in-memory-adapter");
  16. const debug_1 = require("debug");
  17. const crypto_1 = require("crypto");
  18. const debug = (0, debug_1.debug)("socket.io-adapter");
  19. const EMITTER_UID = "emitter";
  20. const DEFAULT_TIMEOUT = 5000;
  21. function randomId() {
  22. return (0, crypto_1.randomBytes)(8).toString("hex");
  23. }
  24. var MessageType;
  25. (function (MessageType) {
  26. MessageType[MessageType["INITIAL_HEARTBEAT"] = 1] = "INITIAL_HEARTBEAT";
  27. MessageType[MessageType["HEARTBEAT"] = 2] = "HEARTBEAT";
  28. MessageType[MessageType["BROADCAST"] = 3] = "BROADCAST";
  29. MessageType[MessageType["SOCKETS_JOIN"] = 4] = "SOCKETS_JOIN";
  30. MessageType[MessageType["SOCKETS_LEAVE"] = 5] = "SOCKETS_LEAVE";
  31. MessageType[MessageType["DISCONNECT_SOCKETS"] = 6] = "DISCONNECT_SOCKETS";
  32. MessageType[MessageType["FETCH_SOCKETS"] = 7] = "FETCH_SOCKETS";
  33. MessageType[MessageType["FETCH_SOCKETS_RESPONSE"] = 8] = "FETCH_SOCKETS_RESPONSE";
  34. MessageType[MessageType["SERVER_SIDE_EMIT"] = 9] = "SERVER_SIDE_EMIT";
  35. MessageType[MessageType["SERVER_SIDE_EMIT_RESPONSE"] = 10] = "SERVER_SIDE_EMIT_RESPONSE";
  36. MessageType[MessageType["BROADCAST_CLIENT_COUNT"] = 11] = "BROADCAST_CLIENT_COUNT";
  37. MessageType[MessageType["BROADCAST_ACK"] = 12] = "BROADCAST_ACK";
  38. MessageType[MessageType["ADAPTER_CLOSE"] = 13] = "ADAPTER_CLOSE";
  39. })(MessageType || (exports.MessageType = MessageType = {}));
  40. function encodeOptions(opts) {
  41. return {
  42. rooms: [...opts.rooms],
  43. except: [...opts.except],
  44. flags: opts.flags,
  45. };
  46. }
  47. function decodeOptions(opts) {
  48. return {
  49. rooms: new Set(opts.rooms),
  50. except: new Set(opts.except),
  51. flags: opts.flags,
  52. };
  53. }
  54. /**
  55. * A cluster-ready adapter. Any extending class must:
  56. *
  57. * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
  58. * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
  59. */
  60. class ClusterAdapter extends in_memory_adapter_1.Adapter {
  61. constructor(nsp) {
  62. super(nsp);
  63. this.requests = new Map();
  64. this.ackRequests = new Map();
  65. this.uid = randomId();
  66. }
  67. /**
  68. * Called when receiving a message from another member of the cluster.
  69. *
  70. * @param message
  71. * @param offset
  72. * @protected
  73. */
  74. onMessage(message, offset) {
  75. if (message.uid === this.uid) {
  76. return debug("[%s] ignore message from self", this.uid);
  77. }
  78. if (message.nsp !== this.nsp.name) {
  79. return debug("[%s] ignore message from another namespace (%s)", this.uid, message.nsp);
  80. }
  81. debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid);
  82. switch (message.type) {
  83. case MessageType.BROADCAST: {
  84. const withAck = message.data.requestId !== undefined;
  85. if (withAck) {
  86. super.broadcastWithAck(message.data.packet, decodeOptions(message.data.opts), (clientCount) => {
  87. debug("[%s] waiting for %d client acknowledgements", this.uid, clientCount);
  88. this.publishResponse(message.uid, {
  89. type: MessageType.BROADCAST_CLIENT_COUNT,
  90. data: {
  91. requestId: message.data.requestId,
  92. clientCount,
  93. },
  94. });
  95. }, (arg) => {
  96. debug("[%s] received acknowledgement with value %j", this.uid, arg);
  97. this.publishResponse(message.uid, {
  98. type: MessageType.BROADCAST_ACK,
  99. data: {
  100. requestId: message.data.requestId,
  101. packet: arg,
  102. },
  103. });
  104. });
  105. }
  106. else {
  107. const packet = message.data.packet;
  108. const opts = decodeOptions(message.data.opts);
  109. this.addOffsetIfNecessary(packet, opts, offset);
  110. super.broadcast(packet, opts);
  111. }
  112. break;
  113. }
  114. case MessageType.SOCKETS_JOIN:
  115. super.addSockets(decodeOptions(message.data.opts), message.data.rooms);
  116. break;
  117. case MessageType.SOCKETS_LEAVE:
  118. super.delSockets(decodeOptions(message.data.opts), message.data.rooms);
  119. break;
  120. case MessageType.DISCONNECT_SOCKETS:
  121. super.disconnectSockets(decodeOptions(message.data.opts), message.data.close);
  122. break;
  123. case MessageType.FETCH_SOCKETS: {
  124. debug("[%s] calling fetchSockets with opts %j", this.uid, message.data.opts);
  125. super
  126. .fetchSockets(decodeOptions(message.data.opts))
  127. .then((localSockets) => {
  128. this.publishResponse(message.uid, {
  129. type: MessageType.FETCH_SOCKETS_RESPONSE,
  130. data: {
  131. requestId: message.data.requestId,
  132. sockets: localSockets.map((socket) => {
  133. // remove sessionStore from handshake, as it may contain circular references
  134. const _a = socket.handshake, { sessionStore } = _a, handshake = __rest(_a, ["sessionStore"]);
  135. return {
  136. id: socket.id,
  137. handshake,
  138. rooms: [...socket.rooms],
  139. data: socket.data,
  140. };
  141. }),
  142. },
  143. });
  144. });
  145. break;
  146. }
  147. case MessageType.SERVER_SIDE_EMIT: {
  148. const packet = message.data.packet;
  149. const withAck = message.data.requestId !== undefined;
  150. if (!withAck) {
  151. this.nsp._onServerSideEmit(packet);
  152. return;
  153. }
  154. let called = false;
  155. const callback = (arg) => {
  156. // only one argument is expected
  157. if (called) {
  158. return;
  159. }
  160. called = true;
  161. debug("[%s] calling acknowledgement with %j", this.uid, arg);
  162. this.publishResponse(message.uid, {
  163. type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
  164. data: {
  165. requestId: message.data.requestId,
  166. packet: arg,
  167. },
  168. });
  169. };
  170. this.nsp._onServerSideEmit([...packet, callback]);
  171. break;
  172. }
  173. // @ts-ignore
  174. case MessageType.BROADCAST_CLIENT_COUNT:
  175. // @ts-ignore
  176. case MessageType.BROADCAST_ACK:
  177. // @ts-ignore
  178. case MessageType.FETCH_SOCKETS_RESPONSE:
  179. // @ts-ignore
  180. case MessageType.SERVER_SIDE_EMIT_RESPONSE:
  181. // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may
  182. // always call the onMessage() method
  183. this.onResponse(message);
  184. break;
  185. default:
  186. debug("[%s] unknown message type: %s", this.uid, message.type);
  187. }
  188. }
  189. /**
  190. * Called when receiving a response from another member of the cluster.
  191. *
  192. * @param response
  193. * @protected
  194. */
  195. onResponse(response) {
  196. var _a, _b;
  197. const requestId = response.data.requestId;
  198. debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
  199. switch (response.type) {
  200. case MessageType.BROADCAST_CLIENT_COUNT: {
  201. (_a = this.ackRequests
  202. .get(requestId)) === null || _a === void 0 ? void 0 : _a.clientCountCallback(response.data.clientCount);
  203. break;
  204. }
  205. case MessageType.BROADCAST_ACK: {
  206. (_b = this.ackRequests.get(requestId)) === null || _b === void 0 ? void 0 : _b.ack(response.data.packet);
  207. break;
  208. }
  209. case MessageType.FETCH_SOCKETS_RESPONSE: {
  210. const request = this.requests.get(requestId);
  211. if (!request) {
  212. return;
  213. }
  214. request.current++;
  215. response.data.sockets.forEach((socket) => request.responses.push(socket));
  216. if (request.current === request.expected) {
  217. clearTimeout(request.timeout);
  218. request.resolve(request.responses);
  219. this.requests.delete(requestId);
  220. }
  221. break;
  222. }
  223. case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
  224. const request = this.requests.get(requestId);
  225. if (!request) {
  226. return;
  227. }
  228. request.current++;
  229. request.responses.push(response.data.packet);
  230. if (request.current === request.expected) {
  231. clearTimeout(request.timeout);
  232. request.resolve(null, request.responses);
  233. this.requests.delete(requestId);
  234. }
  235. break;
  236. }
  237. default:
  238. // @ts-ignore
  239. debug("[%s] unknown response type: %s", this.uid, response.type);
  240. }
  241. }
  242. async broadcast(packet, opts) {
  243. var _a;
  244. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  245. if (!onlyLocal) {
  246. try {
  247. const offset = await this.publishAndReturnOffset({
  248. type: MessageType.BROADCAST,
  249. data: {
  250. packet,
  251. opts: encodeOptions(opts),
  252. },
  253. });
  254. this.addOffsetIfNecessary(packet, opts, offset);
  255. }
  256. catch (e) {
  257. return debug("[%s] error while broadcasting message: %s", this.uid, e.message);
  258. }
  259. }
  260. super.broadcast(packet, opts);
  261. }
  262. /**
  263. * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it
  264. * reconnects after a temporary disconnection.
  265. *
  266. * @param packet
  267. * @param opts
  268. * @param offset
  269. * @private
  270. */
  271. addOffsetIfNecessary(packet, opts, offset) {
  272. var _a;
  273. if (!this.nsp.server.opts.connectionStateRecovery) {
  274. return;
  275. }
  276. const isEventPacket = packet.type === 2;
  277. // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
  278. // restored on another server upon reconnection
  279. const withoutAcknowledgement = packet.id === undefined;
  280. const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined;
  281. if (isEventPacket && withoutAcknowledgement && notVolatile) {
  282. packet.data.push(offset);
  283. }
  284. }
  285. broadcastWithAck(packet, opts, clientCountCallback, ack) {
  286. var _a;
  287. const onlyLocal = (_a = opts === null || opts === void 0 ? void 0 : opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  288. if (!onlyLocal) {
  289. const requestId = randomId();
  290. this.ackRequests.set(requestId, {
  291. clientCountCallback,
  292. ack,
  293. });
  294. this.publish({
  295. type: MessageType.BROADCAST,
  296. data: {
  297. packet,
  298. requestId,
  299. opts: encodeOptions(opts),
  300. },
  301. });
  302. // we have no way to know at this level whether the server has received an acknowledgement from each client, so we
  303. // will simply clean up the ackRequests map after the given delay
  304. setTimeout(() => {
  305. this.ackRequests.delete(requestId);
  306. }, opts.flags.timeout);
  307. }
  308. super.broadcastWithAck(packet, opts, clientCountCallback, ack);
  309. }
  310. async addSockets(opts, rooms) {
  311. var _a;
  312. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  313. if (!onlyLocal) {
  314. try {
  315. await this.publishAndReturnOffset({
  316. type: MessageType.SOCKETS_JOIN,
  317. data: {
  318. opts: encodeOptions(opts),
  319. rooms,
  320. },
  321. });
  322. }
  323. catch (e) {
  324. debug("[%s] error while publishing message: %s", this.uid, e.message);
  325. }
  326. }
  327. super.addSockets(opts, rooms);
  328. }
  329. async delSockets(opts, rooms) {
  330. var _a;
  331. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  332. if (!onlyLocal) {
  333. try {
  334. await this.publishAndReturnOffset({
  335. type: MessageType.SOCKETS_LEAVE,
  336. data: {
  337. opts: encodeOptions(opts),
  338. rooms,
  339. },
  340. });
  341. }
  342. catch (e) {
  343. debug("[%s] error while publishing message: %s", this.uid, e.message);
  344. }
  345. }
  346. super.delSockets(opts, rooms);
  347. }
  348. async disconnectSockets(opts, close) {
  349. var _a;
  350. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  351. if (!onlyLocal) {
  352. try {
  353. await this.publishAndReturnOffset({
  354. type: MessageType.DISCONNECT_SOCKETS,
  355. data: {
  356. opts: encodeOptions(opts),
  357. close,
  358. },
  359. });
  360. }
  361. catch (e) {
  362. debug("[%s] error while publishing message: %s", this.uid, e.message);
  363. }
  364. }
  365. super.disconnectSockets(opts, close);
  366. }
  367. async fetchSockets(opts) {
  368. var _a;
  369. const [localSockets, serverCount] = await Promise.all([
  370. super.fetchSockets(opts),
  371. this.serverCount(),
  372. ]);
  373. const expectedResponseCount = serverCount - 1;
  374. if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
  375. return localSockets;
  376. }
  377. const requestId = randomId();
  378. return new Promise((resolve, reject) => {
  379. const timeout = setTimeout(() => {
  380. const storedRequest = this.requests.get(requestId);
  381. if (storedRequest) {
  382. reject(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`));
  383. this.requests.delete(requestId);
  384. }
  385. }, opts.flags.timeout || DEFAULT_TIMEOUT);
  386. const storedRequest = {
  387. type: MessageType.FETCH_SOCKETS,
  388. resolve,
  389. timeout,
  390. current: 0,
  391. expected: expectedResponseCount,
  392. responses: localSockets,
  393. };
  394. this.requests.set(requestId, storedRequest);
  395. this.publish({
  396. type: MessageType.FETCH_SOCKETS,
  397. data: {
  398. opts: encodeOptions(opts),
  399. requestId,
  400. },
  401. });
  402. });
  403. }
  404. async serverSideEmit(packet) {
  405. const withAck = typeof packet[packet.length - 1] === "function";
  406. if (!withAck) {
  407. return this.publish({
  408. type: MessageType.SERVER_SIDE_EMIT,
  409. data: {
  410. packet,
  411. },
  412. });
  413. }
  414. const ack = packet.pop();
  415. const expectedResponseCount = (await this.serverCount()) - 1;
  416. debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
  417. if (expectedResponseCount <= 0) {
  418. return ack(null, []);
  419. }
  420. const requestId = randomId();
  421. const timeout = setTimeout(() => {
  422. const storedRequest = this.requests.get(requestId);
  423. if (storedRequest) {
  424. ack(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`), storedRequest.responses);
  425. this.requests.delete(requestId);
  426. }
  427. }, DEFAULT_TIMEOUT);
  428. const storedRequest = {
  429. type: MessageType.SERVER_SIDE_EMIT,
  430. resolve: ack,
  431. timeout,
  432. current: 0,
  433. expected: expectedResponseCount,
  434. responses: [],
  435. };
  436. this.requests.set(requestId, storedRequest);
  437. this.publish({
  438. type: MessageType.SERVER_SIDE_EMIT,
  439. data: {
  440. requestId, // the presence of this attribute defines whether an acknowledgement is needed
  441. packet,
  442. },
  443. });
  444. }
  445. publish(message) {
  446. debug("[%s] sending message %s", this.uid, message.type);
  447. this.publishAndReturnOffset(message).catch((err) => {
  448. debug("[%s] error while publishing message: %s", this.uid, err);
  449. });
  450. }
  451. publishAndReturnOffset(message) {
  452. message.uid = this.uid;
  453. message.nsp = this.nsp.name;
  454. return this.doPublish(message);
  455. }
  456. publishResponse(requesterUid, response) {
  457. response.uid = this.uid;
  458. response.nsp = this.nsp.name;
  459. debug("[%s] sending response %s to %s", this.uid, response.type, requesterUid);
  460. this.doPublishResponse(requesterUid, response).catch((err) => {
  461. debug("[%s] error while publishing response: %s", this.uid, err);
  462. });
  463. }
  464. }
  465. exports.ClusterAdapter = ClusterAdapter;
  466. class ClusterAdapterWithHeartbeat extends ClusterAdapter {
  467. constructor(nsp, opts) {
  468. super(nsp);
  469. this.nodesMap = new Map(); // uid => timestamp of last message
  470. this.customRequests = new Map();
  471. this._opts = Object.assign({
  472. heartbeatInterval: 5000,
  473. heartbeatTimeout: 10000,
  474. }, opts);
  475. this.cleanupTimer = setInterval(() => {
  476. const now = Date.now();
  477. this.nodesMap.forEach((lastSeen, uid) => {
  478. const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
  479. if (nodeSeemsDown) {
  480. debug("[%s] node %s seems down", this.uid, uid);
  481. this.removeNode(uid);
  482. }
  483. });
  484. }, 1000);
  485. }
  486. init() {
  487. this.publish({
  488. type: MessageType.INITIAL_HEARTBEAT,
  489. });
  490. }
  491. scheduleHeartbeat() {
  492. if (this.heartbeatTimer) {
  493. this.heartbeatTimer.refresh();
  494. }
  495. else {
  496. this.heartbeatTimer = setTimeout(() => {
  497. this.publish({
  498. type: MessageType.HEARTBEAT,
  499. });
  500. }, this._opts.heartbeatInterval);
  501. }
  502. }
  503. close() {
  504. this.publish({
  505. type: MessageType.ADAPTER_CLOSE,
  506. });
  507. clearTimeout(this.heartbeatTimer);
  508. if (this.cleanupTimer) {
  509. clearInterval(this.cleanupTimer);
  510. }
  511. }
  512. onMessage(message, offset) {
  513. if (message.uid === this.uid) {
  514. return debug("[%s] ignore message from self", this.uid);
  515. }
  516. if (message.uid && message.uid !== EMITTER_UID) {
  517. // we track the UID of each sender to know how many servers there are in the cluster
  518. this.nodesMap.set(message.uid, Date.now());
  519. }
  520. switch (message.type) {
  521. case MessageType.INITIAL_HEARTBEAT:
  522. this.publish({
  523. type: MessageType.HEARTBEAT,
  524. });
  525. break;
  526. case MessageType.HEARTBEAT:
  527. // nothing to do
  528. break;
  529. case MessageType.ADAPTER_CLOSE:
  530. this.removeNode(message.uid);
  531. break;
  532. default:
  533. super.onMessage(message, offset);
  534. }
  535. }
  536. serverCount() {
  537. return Promise.resolve(1 + this.nodesMap.size);
  538. }
  539. publish(message) {
  540. this.scheduleHeartbeat();
  541. return super.publish(message);
  542. }
  543. async serverSideEmit(packet) {
  544. const withAck = typeof packet[packet.length - 1] === "function";
  545. if (!withAck) {
  546. return this.publish({
  547. type: MessageType.SERVER_SIDE_EMIT,
  548. data: {
  549. packet,
  550. },
  551. });
  552. }
  553. const ack = packet.pop();
  554. const expectedResponseCount = this.nodesMap.size;
  555. debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
  556. if (expectedResponseCount <= 0) {
  557. return ack(null, []);
  558. }
  559. const requestId = randomId();
  560. const timeout = setTimeout(() => {
  561. const storedRequest = this.customRequests.get(requestId);
  562. if (storedRequest) {
  563. ack(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`), storedRequest.responses);
  564. this.customRequests.delete(requestId);
  565. }
  566. }, DEFAULT_TIMEOUT);
  567. const storedRequest = {
  568. type: MessageType.SERVER_SIDE_EMIT,
  569. resolve: ack,
  570. timeout,
  571. missingUids: new Set([...this.nodesMap.keys()]),
  572. responses: [],
  573. };
  574. this.customRequests.set(requestId, storedRequest);
  575. this.publish({
  576. type: MessageType.SERVER_SIDE_EMIT,
  577. data: {
  578. requestId, // the presence of this attribute defines whether an acknowledgement is needed
  579. packet,
  580. },
  581. });
  582. }
  583. async fetchSockets(opts) {
  584. var _a;
  585. const [localSockets, serverCount] = await Promise.all([
  586. super.fetchSockets({
  587. rooms: opts.rooms,
  588. except: opts.except,
  589. flags: {
  590. local: true,
  591. },
  592. }),
  593. this.serverCount(),
  594. ]);
  595. const expectedResponseCount = serverCount - 1;
  596. if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
  597. return localSockets;
  598. }
  599. const requestId = randomId();
  600. return new Promise((resolve, reject) => {
  601. const timeout = setTimeout(() => {
  602. const storedRequest = this.customRequests.get(requestId);
  603. if (storedRequest) {
  604. reject(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`));
  605. this.customRequests.delete(requestId);
  606. }
  607. }, opts.flags.timeout || DEFAULT_TIMEOUT);
  608. const storedRequest = {
  609. type: MessageType.FETCH_SOCKETS,
  610. resolve,
  611. timeout,
  612. missingUids: new Set([...this.nodesMap.keys()]),
  613. responses: localSockets,
  614. };
  615. this.customRequests.set(requestId, storedRequest);
  616. this.publish({
  617. type: MessageType.FETCH_SOCKETS,
  618. data: {
  619. opts: encodeOptions(opts),
  620. requestId,
  621. },
  622. });
  623. });
  624. }
  625. onResponse(response) {
  626. const requestId = response.data.requestId;
  627. debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
  628. switch (response.type) {
  629. case MessageType.FETCH_SOCKETS_RESPONSE: {
  630. const request = this.customRequests.get(requestId);
  631. if (!request) {
  632. return;
  633. }
  634. response.data.sockets.forEach((socket) => request.responses.push(socket));
  635. request.missingUids.delete(response.uid);
  636. if (request.missingUids.size === 0) {
  637. clearTimeout(request.timeout);
  638. request.resolve(request.responses);
  639. this.customRequests.delete(requestId);
  640. }
  641. break;
  642. }
  643. case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
  644. const request = this.customRequests.get(requestId);
  645. if (!request) {
  646. return;
  647. }
  648. request.responses.push(response.data.packet);
  649. request.missingUids.delete(response.uid);
  650. if (request.missingUids.size === 0) {
  651. clearTimeout(request.timeout);
  652. request.resolve(null, request.responses);
  653. this.customRequests.delete(requestId);
  654. }
  655. break;
  656. }
  657. default:
  658. super.onResponse(response);
  659. }
  660. }
  661. removeNode(uid) {
  662. this.customRequests.forEach((request, requestId) => {
  663. request.missingUids.delete(uid);
  664. if (request.missingUids.size === 0) {
  665. clearTimeout(request.timeout);
  666. if (request.type === MessageType.FETCH_SOCKETS) {
  667. request.resolve(request.responses);
  668. }
  669. else if (request.type === MessageType.SERVER_SIDE_EMIT) {
  670. request.resolve(null, request.responses);
  671. }
  672. this.customRequests.delete(requestId);
  673. }
  674. });
  675. this.nodesMap.delete(uid);
  676. }
  677. }
  678. exports.ClusterAdapterWithHeartbeat = ClusterAdapterWithHeartbeat;