persistence.js 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. const { Readable } = require('stream')
  2. const QlobberSub = require('qlobber/aedes/qlobber-sub')
  3. const { QlobberTrue } = require('qlobber')
  4. const Packet = require('aedes-packet')
  5. const QlobberOpts = {
  6. wildcard_one: '+',
  7. wildcard_some: '#',
  8. separator: '/'
  9. }
  10. const CREATE_ON_EMPTY = true
  11. function * multiIterables (iterables) {
  12. for (const iter of iterables) {
  13. yield * iter
  14. }
  15. }
  16. function * retainedMessagesByPattern (retained, pattern) {
  17. const qlobber = new QlobberTrue(QlobberOpts)
  18. qlobber.add(pattern)
  19. for (const [topic, packet] of retained) {
  20. if (qlobber.test(topic)) {
  21. yield packet
  22. }
  23. }
  24. }
  25. function * willsByBrokers (wills, brokers) {
  26. for (const will of wills.values()) {
  27. if (!brokers[will.brokerId]) {
  28. yield will
  29. }
  30. }
  31. }
  32. function * clientListbyTopic (subscriptions, topic) {
  33. for (const [clientId, topicMap] of subscriptions) {
  34. if (topicMap.has(topic)) {
  35. yield clientId
  36. }
  37. }
  38. }
  39. class MemoryPersistence {
  40. // private class members start with #
  41. #retained
  42. #subscriptions
  43. #outgoing
  44. #incoming
  45. #wills
  46. #clientsCount
  47. #trie
  48. constructor () {
  49. // using Maps for convenience and security (risk on prototype polution)
  50. // Map ( topic -> packet )
  51. this.#retained = new Map()
  52. // Map ( clientId -> Map( topic -> { qos, rh, rap, nl } ))
  53. this.#subscriptions = new Map()
  54. // Map ( clientId > [ packet ] }
  55. this.#outgoing = new Map()
  56. // Map ( clientId -> { packetId -> Packet } )
  57. this.#incoming = new Map()
  58. // Map( clientId -> will )
  59. this.#wills = new Map()
  60. this.#clientsCount = 0
  61. this.#trie = new QlobberSub(QlobberOpts)
  62. }
  63. storeRetained (pkt, cb) {
  64. const packet = Object.assign({}, pkt)
  65. if (packet.payload.length === 0) {
  66. this.#retained.delete(packet.topic)
  67. } else {
  68. this.#retained.set(packet.topic, packet)
  69. }
  70. cb(null)
  71. }
  72. createRetainedStreamCombi (patterns) {
  73. const iterables = patterns.map((p) => {
  74. return retainedMessagesByPattern(this.#retained, p)
  75. })
  76. return Readable.from(multiIterables(iterables))
  77. }
  78. createRetainedStream (pattern) {
  79. return Readable.from(retainedMessagesByPattern(this.#retained, pattern))
  80. }
  81. addSubscriptions (client, subs, cb) {
  82. let stored = this.#subscriptions.get(client.id)
  83. const trie = this.#trie
  84. if (!stored) {
  85. stored = new Map()
  86. this.#subscriptions.set(client.id, stored)
  87. this.#clientsCount++
  88. }
  89. for (const sub of subs) {
  90. const storedSub = stored.get(sub.topic)
  91. if (sub.qos > 0) {
  92. trie.add(sub.topic, {
  93. clientId: client.id,
  94. topic: sub.topic,
  95. qos: sub.qos,
  96. rh: sub.rh,
  97. rap: sub.rap,
  98. nl: sub.nl
  99. })
  100. } else if (storedSub?.qos > 0) {
  101. trie.remove(sub.topic, {
  102. clientId: client.id,
  103. topic: sub.topic
  104. })
  105. }
  106. stored.set(sub.topic, { qos: sub.qos, rh: sub.rh, rap: sub.rap, nl: sub.nl })
  107. }
  108. cb(null, client)
  109. }
  110. removeSubscriptions (client, subs, cb) {
  111. const stored = this.#subscriptions.get(client.id)
  112. const trie = this.#trie
  113. if (stored) {
  114. for (const topic of subs) {
  115. const storedSub = stored.get(topic)
  116. if (storedSub !== undefined) {
  117. if (storedSub.qos > 0) {
  118. trie.remove(topic, { clientId: client.id, topic })
  119. }
  120. stored.delete(topic)
  121. }
  122. }
  123. if (stored.size === 0) {
  124. this.#clientsCount--
  125. this.#subscriptions.delete(client.id)
  126. }
  127. }
  128. cb(null, client)
  129. }
  130. subscriptionsByClient (client, cb) {
  131. let subs = null
  132. const stored = this.#subscriptions.get(client.id)
  133. if (stored) {
  134. subs = []
  135. for (const [topic, storedSub] of stored) {
  136. subs.push({ topic, ...storedSub })
  137. }
  138. }
  139. cb(null, subs, client)
  140. }
  141. countOffline (cb) {
  142. return cb(null, this.#trie.subscriptionsCount, this.#clientsCount)
  143. }
  144. subscriptionsByTopic (pattern, cb) {
  145. cb(null, this.#trie.match(pattern))
  146. }
  147. cleanSubscriptions (client, cb) {
  148. const trie = this.#trie
  149. const stored = this.#subscriptions.get(client.id)
  150. if (stored) {
  151. for (const [topic, storedSub] of stored) {
  152. if (storedSub.qos > 0) {
  153. trie.remove(topic, { clientId: client.id, topic })
  154. }
  155. }
  156. this.#clientsCount--
  157. this.#subscriptions.delete(client.id)
  158. }
  159. cb(null, client)
  160. }
  161. #outgoingEnqueuePerSub (sub, packet) {
  162. const id = sub.clientId
  163. const queue = getMapRef(this.#outgoing, id, [], CREATE_ON_EMPTY)
  164. queue[queue.length] = new Packet(packet)
  165. }
  166. outgoingEnqueue (sub, packet, cb) {
  167. this.#outgoingEnqueuePerSub(sub, packet)
  168. process.nextTick(cb)
  169. }
  170. outgoingEnqueueCombi (subs, packet, cb) {
  171. for (let i = 0; i < subs.length; i++) {
  172. this.#outgoingEnqueuePerSub(subs[i], packet)
  173. }
  174. process.nextTick(cb)
  175. }
  176. outgoingUpdate (client, packet, cb) {
  177. const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
  178. let temp
  179. for (let i = 0; i < outgoing.length; i++) {
  180. temp = outgoing[i]
  181. if (temp.brokerId === packet.brokerId) {
  182. if (temp.brokerCounter === packet.brokerCounter) {
  183. temp.messageId = packet.messageId
  184. return cb(null, client, packet)
  185. }
  186. /*
  187. Maximum of messageId (packet identifier) is 65535 and will be rotated,
  188. brokerCounter is to ensure the packet identifier be unique.
  189. The for loop is going to search which packet messageId should be updated
  190. in the #outgoing queue.
  191. If there is a case that brokerCounter is different but messageId is same,
  192. we need to let the loop keep searching
  193. */
  194. } else if (temp.messageId === packet.messageId) {
  195. outgoing[i] = packet
  196. return cb(null, client, packet)
  197. }
  198. }
  199. cb(new Error('no such packet'), client, packet)
  200. }
  201. outgoingClearMessageId (client, packet, cb) {
  202. const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
  203. let temp
  204. for (let i = 0; i < outgoing.length; i++) {
  205. temp = outgoing[i]
  206. if (temp.messageId === packet.messageId) {
  207. outgoing.splice(i, 1)
  208. return cb(null, temp)
  209. }
  210. }
  211. cb()
  212. }
  213. outgoingStream (client) {
  214. // shallow clone the outgoing queue for this client to avoid race conditions
  215. const outgoing = [].concat(getMapRef(this.#outgoing, client.id, []))
  216. return Readable.from(outgoing)
  217. }
  218. incomingStorePacket (client, packet, cb) {
  219. const id = client.id
  220. const store = getMapRef(this.#incoming, id, {}, CREATE_ON_EMPTY)
  221. store[packet.messageId] = new Packet(packet)
  222. store[packet.messageId].messageId = packet.messageId
  223. cb(null)
  224. }
  225. incomingGetPacket (client, packet, cb) {
  226. const id = client.id
  227. const store = getMapRef(this.#incoming, id, {})
  228. let err = null
  229. this.#incoming.set(id, store)
  230. if (!store[packet.messageId]) {
  231. err = new Error('no such packet')
  232. }
  233. cb(err, store[packet.messageId])
  234. }
  235. incomingDelPacket (client, packet, cb) {
  236. const id = client.id
  237. const store = getMapRef(this.#incoming, id, {})
  238. const toDelete = store[packet.messageId]
  239. let err = null
  240. if (!toDelete) {
  241. err = new Error('no such packet')
  242. } else {
  243. delete store[packet.messageId]
  244. }
  245. cb(err)
  246. }
  247. putWill (client, packet, cb) {
  248. packet.brokerId = this.broker.id
  249. packet.clientId = client.id
  250. this.#wills.set(client.id, packet)
  251. cb(null, client)
  252. }
  253. getWill (client, cb) {
  254. cb(null, this.#wills.get(client.id), client)
  255. }
  256. delWill (client, cb) {
  257. const will = this.#wills.get(client.id)
  258. this.#wills.delete(client.id)
  259. cb(null, will, client)
  260. }
  261. streamWill (brokers = {}) {
  262. return Readable.from(willsByBrokers(this.#wills, brokers))
  263. }
  264. getClientList (topic) {
  265. return Readable.from(clientListbyTopic(this.#subscriptions, topic))
  266. }
  267. destroy (cb) {
  268. this.#retained = null
  269. if (cb) {
  270. cb(null)
  271. }
  272. }
  273. }
  274. function getMapRef (map, key, ifEmpty, createOnEmpty = false) {
  275. const value = map.get(key)
  276. if (value === undefined && createOnEmpty) {
  277. map.set(key, ifEmpty)
  278. }
  279. return value || ifEmpty
  280. }
  281. module.exports = () => { return new MemoryPersistence() }
  282. module.exports.Packet = Packet