aedes.js 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. 'use strict'
  2. const EventEmitter = require('events')
  3. const util = require('util')
  4. const parallel = require('fastparallel')
  5. const series = require('fastseries')
  6. const { v4: uuidv4 } = require('uuid')
  7. const reusify = require('reusify')
  8. const { pipeline } = require('stream')
  9. const Packet = require('aedes-packet')
  10. const memory = require('aedes-persistence')
  11. const mqemitter = require('mqemitter')
  12. const Client = require('./lib/client')
  13. const { $SYS_PREFIX, bulk } = require('./lib/utils')
  14. module.exports = Aedes.createBroker = Aedes
  15. const defaultOptions = {
  16. concurrency: 100,
  17. heartbeatInterval: 60000, // 1 minute
  18. connectTimeout: 30000, // 30 secs
  19. decodeProtocol: null,
  20. preConnect: defaultPreConnect,
  21. authenticate: defaultAuthenticate,
  22. authorizePublish: defaultAuthorizePublish,
  23. authorizeSubscribe: defaultAuthorizeSubscribe,
  24. authorizeForward: defaultAuthorizeForward,
  25. published: defaultPublished,
  26. trustProxy: false,
  27. trustedProxies: [],
  28. queueLimit: 42,
  29. maxClientsIdLength: 23,
  30. keepaliveLimit: 0
  31. }
  32. function Aedes (opts) {
  33. const that = this
  34. if (!(this instanceof Aedes)) {
  35. return new Aedes(opts)
  36. }
  37. opts = Object.assign({}, defaultOptions, opts)
  38. this.id = opts.id || uuidv4()
  39. // +1 when construct a new aedes-packet
  40. // internal track for last brokerCounter
  41. this.counter = 0
  42. this.queueLimit = opts.queueLimit
  43. this.connectTimeout = opts.connectTimeout
  44. this.keepaliveLimit = opts.keepaliveLimit
  45. this.maxClientsIdLength = opts.maxClientsIdLength
  46. this.mq = opts.mq || mqemitter({
  47. concurrency: opts.concurrency,
  48. matchEmptyLevels: true // [MQTT-4.7.1-3]
  49. })
  50. this.handle = function handle (conn, req) {
  51. conn.setMaxListeners(opts.concurrency * 2)
  52. // create a new Client instance for a new connection
  53. // return, just to please standard
  54. return new Client(that, conn, req)
  55. }
  56. this.persistence = opts.persistence || memory()
  57. this.persistence.broker = this
  58. this._parallel = parallel()
  59. this._series = series()
  60. this._enqueuers = reusify(DoEnqueues)
  61. this.preConnect = opts.preConnect
  62. this.authenticate = opts.authenticate
  63. this.authorizePublish = opts.authorizePublish
  64. this.authorizeSubscribe = opts.authorizeSubscribe
  65. this.authorizeForward = opts.authorizeForward
  66. this.published = opts.published
  67. this.decodeProtocol = opts.decodeProtocol
  68. this.trustProxy = opts.trustProxy
  69. this.trustedProxies = opts.trustedProxies
  70. this.clients = {}
  71. this.brokers = {}
  72. const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
  73. const birthTopic = $SYS_PREFIX + that.id + '/birth'
  74. this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)
  75. const bufId = Buffer.from(that.id, 'utf8')
  76. // in a cluster env this is used to warn other broker instances
  77. // that this broker is alive
  78. that.publish({
  79. topic: birthTopic,
  80. payload: bufId
  81. }, noop)
  82. function heartbeat () {
  83. that.publish({
  84. topic: heartbeatTopic,
  85. payload: bufId
  86. }, noop)
  87. }
  88. function deleteOldBrokers (broker) {
  89. if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
  90. delete that.brokers[broker]
  91. }
  92. }
  93. this._clearWillInterval = setInterval(function () {
  94. Object.keys(that.brokers).forEach(deleteOldBrokers)
  95. pipeline(
  96. that.persistence.streamWill(that.brokers),
  97. bulk(receiveWills),
  98. function done (err) {
  99. if (err) {
  100. that.emit('error', err)
  101. }
  102. }
  103. )
  104. }, opts.heartbeatInterval * 4)
  105. function receiveWills (chunks, done) {
  106. that._parallel(that, checkAndPublish, chunks, done)
  107. }
  108. function checkAndPublish (will, done) {
  109. const notPublish =
  110. that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()
  111. if (notPublish) return done()
  112. // randomize this, so that multiple brokers
  113. // do not publish the same wills at the same time
  114. this.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
  115. if (err) { return doneWill() }
  116. that.publish(will, doneWill)
  117. function doneWill (err) {
  118. if (err) { return done(err) }
  119. that.persistence.delWill({
  120. id: will.clientId,
  121. brokerId: will.brokerId
  122. }, done)
  123. }
  124. })
  125. }
  126. this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
  127. that.brokers[packet.payload.toString()] = Date.now()
  128. done()
  129. })
  130. this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) {
  131. const brokerId = packet.payload.toString()
  132. // reset duplicates counter
  133. if (brokerId !== that.id) {
  134. for (const clientId in that.clients) {
  135. delete that.clients[clientId].duplicates[brokerId]
  136. }
  137. }
  138. done()
  139. })
  140. this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) {
  141. const serverId = packet.topic.split('/')[1]
  142. const clientId = packet.payload.toString()
  143. if (that.clients[clientId] && serverId !== that.id) {
  144. if (that.clients[clientId].closed) {
  145. // remove the client from the list if it is already closed
  146. that.deleteClient(clientId)
  147. done()
  148. } else {
  149. that.clients[clientId].close(done)
  150. }
  151. } else {
  152. done()
  153. }
  154. })
  155. // metadata
  156. this.connectedClients = 0
  157. this.closed = false
  158. }
  159. util.inherits(Aedes, EventEmitter)
  160. function storeRetained (packet, done) {
  161. if (packet.retain) {
  162. this.broker.persistence.storeRetained(packet, done)
  163. } else {
  164. done()
  165. }
  166. }
  167. function emitPacket (packet, done) {
  168. if (this.client) packet.clientId = this.client.id
  169. this.broker.mq.emit(packet, done)
  170. }
  171. function enqueueOffline (packet, done) {
  172. const enqueuer = this.broker._enqueuers.get()
  173. enqueuer.complete = done
  174. enqueuer.packet = packet
  175. enqueuer.topic = packet.topic
  176. enqueuer.broker = this.broker
  177. this.broker.persistence.subscriptionsByTopic(
  178. packet.topic,
  179. enqueuer.done
  180. )
  181. }
  182. function DoEnqueues () {
  183. this.next = null
  184. this.complete = null
  185. this.packet = null
  186. this.topic = null
  187. this.broker = null
  188. const that = this
  189. this.done = function doneEnqueue (err, subs) {
  190. const broker = that.broker
  191. if (err) {
  192. // is this really recoverable?
  193. // let's just error the whole aedes
  194. // https://nodejs.org/api/events.html#events_error_events
  195. broker.emit('error', err)
  196. return
  197. }
  198. if (that.topic.indexOf($SYS_PREFIX) === 0) {
  199. subs = subs.filter(removeSharp)
  200. }
  201. const packet = that.packet
  202. const complete = that.complete
  203. that.packet = null
  204. that.complete = null
  205. that.topic = null
  206. broker.persistence.outgoingEnqueueCombi(subs, packet, complete)
  207. broker._enqueuers.release(that)
  208. }
  209. }
  210. // + is 43
  211. // # is 35
  212. function removeSharp (sub) {
  213. const code = sub.topic.charCodeAt(0)
  214. return code !== 43 && code !== 35
  215. }
  216. function callPublished (_, done) {
  217. this.broker.published(this.packet, this.client, done)
  218. this.broker.emit('publish', this.packet, this.client)
  219. }
  220. const publishFuncsSimple = [
  221. storeRetained,
  222. emitPacket,
  223. callPublished
  224. ]
  225. const publishFuncsQoS = [
  226. storeRetained,
  227. enqueueOffline,
  228. emitPacket,
  229. callPublished
  230. ]
  231. Aedes.prototype.publish = function (packet, client, done) {
  232. if (typeof client === 'function') {
  233. done = client
  234. client = null
  235. }
  236. const p = new Packet(packet, this)
  237. const publishFuncs = p.qos > 0 ? publishFuncsQoS : publishFuncsSimple
  238. this._series(new PublishState(this, client, packet), publishFuncs, p, done)
  239. }
  240. Aedes.prototype.subscribe = function (topic, func, done) {
  241. this.mq.on(topic, func, done)
  242. }
  243. Aedes.prototype.unsubscribe = function (topic, func, done) {
  244. this.mq.removeListener(topic, func, done)
  245. }
  246. Aedes.prototype.registerClient = function (client) {
  247. const that = this
  248. if (this.clients[client.id]) {
  249. // [MQTT-3.1.4-2]
  250. this.clients[client.id].close(function closeClient () {
  251. that._finishRegisterClient(client)
  252. })
  253. } else {
  254. this._finishRegisterClient(client)
  255. }
  256. }
  257. Aedes.prototype._finishRegisterClient = function (client) {
  258. this.connectedClients++
  259. this.clients[client.id] = client
  260. this.emit('client', client)
  261. this.publish({
  262. topic: $SYS_PREFIX + this.id + '/new/clients',
  263. payload: Buffer.from(client.id, 'utf8')
  264. }, noop)
  265. }
  266. Aedes.prototype.unregisterClient = function (client) {
  267. this.deleteClient(client.id)
  268. this.emit('clientDisconnect', client)
  269. this.publish({
  270. topic: $SYS_PREFIX + this.id + '/disconnect/clients',
  271. payload: Buffer.from(client.id, 'utf8')
  272. }, noop)
  273. }
  274. Aedes.prototype.deleteClient = function (clientId) {
  275. this.connectedClients--
  276. delete this.clients[clientId]
  277. }
  278. function closeClient (client, cb) {
  279. this.clients[client].close(cb)
  280. }
  281. Aedes.prototype.close = function (cb = noop) {
  282. const that = this
  283. if (this.closed) {
  284. return cb()
  285. }
  286. this.closed = true
  287. clearInterval(this._heartbeatInterval)
  288. clearInterval(this._clearWillInterval)
  289. this._parallel(this, closeClient, Object.keys(this.clients), doneClose)
  290. function doneClose () {
  291. that.emit('closed')
  292. that.mq.close(cb)
  293. }
  294. }
  295. Aedes.prototype.version = require('./package.json').version
  296. function defaultPreConnect (client, packet, callback) {
  297. callback(null, true)
  298. }
  299. function defaultAuthenticate (client, username, password, callback) {
  300. callback(null, true)
  301. }
  302. function defaultAuthorizePublish (client, packet, callback) {
  303. if (packet.topic.startsWith($SYS_PREFIX)) {
  304. return callback(new Error($SYS_PREFIX + ' topic is reserved'))
  305. }
  306. callback(null)
  307. }
  308. function defaultAuthorizeSubscribe (client, sub, callback) {
  309. callback(null, sub)
  310. }
  311. function defaultAuthorizeForward (client, packet) {
  312. return packet
  313. }
  314. function defaultPublished (packet, client, callback) {
  315. callback(null)
  316. }
  317. function PublishState (broker, client, packet) {
  318. this.broker = broker
  319. this.client = client
  320. this.packet = packet
  321. }
  322. function noop () {}