client.js 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. 'use strict'
  2. const mqtt = require('mqtt-packet')
  3. const EventEmitter = require('events')
  4. const util = require('util')
  5. const eos = require('end-of-stream')
  6. const Packet = require('aedes-packet')
  7. const write = require('./write')
  8. const QoSPacket = require('./qos-packet')
  9. const handleSubscribe = require('./handlers/subscribe')
  10. const handleUnsubscribe = require('./handlers/unsubscribe')
  11. const handle = require('./handlers')
  12. const { pipeline } = require('stream')
  13. const { through } = require('./utils')
  14. module.exports = Client
  15. function Client (broker, conn, req) {
  16. const that = this
  17. // metadata
  18. this.closed = false
  19. this.connecting = false
  20. this.connected = false
  21. this.connackSent = false
  22. this.errored = false
  23. // mqtt params
  24. this.id = null
  25. this.clean = true
  26. this.version = null
  27. this.subscriptions = {}
  28. this.duplicates = {}
  29. this.broker = broker
  30. this.conn = conn
  31. conn.client = this
  32. this._disconnected = false
  33. this._authorized = false
  34. this._parsingBatch = 1
  35. this._nextId = Math.ceil(Math.random() * 65535)
  36. this.req = req
  37. this.connDetails = req ? req.connDetails : null
  38. // we use two variables for the will
  39. // because we store in _will while
  40. // we are authenticating
  41. this.will = null
  42. this._will = null
  43. this._parser = mqtt.parser()
  44. this._parser.client = this
  45. this._parser._queue = [] // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event
  46. this._parser.on('packet', enqueue)
  47. this.once('connected', dequeue)
  48. function nextBatch (err) {
  49. if (err) {
  50. that.emit('error', err)
  51. return
  52. }
  53. const client = that
  54. if (client._paused) {
  55. return
  56. }
  57. that._parsingBatch--
  58. if (that._parsingBatch <= 0) {
  59. that._parsingBatch = 0
  60. const buf = client.conn.read(null)
  61. if (buf) {
  62. client._parser.parse(buf)
  63. }
  64. }
  65. }
  66. this._nextBatch = nextBatch
  67. conn.on('readable', nextBatch)
  68. this.on('error', onError)
  69. conn.on('error', this.emit.bind(this, 'error'))
  70. this._parser.on('error', this.emit.bind(this, 'error'))
  71. conn.on('end', this.close.bind(this))
  72. this._eos = eos(this.conn, this.close.bind(this))
  73. const getToForwardPacket = (_packet) => {
  74. // Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169
  75. // prevent to forward messages sent by the same client when no-local flag is set
  76. if (_packet.clientId === that.id && _packet.nl) return
  77. const toForward = dedupe(that, _packet) &&
  78. that.broker.authorizeForward(that, _packet)
  79. return toForward
  80. }
  81. this.deliver0 = function deliverQoS0 (_packet, cb) {
  82. const toForward = getToForwardPacket(_packet)
  83. if (toForward) {
  84. // Give nodejs some time to clear stacks, or we will see
  85. // "Maximum call stack size exceeded" in a very high load
  86. setImmediate(() => {
  87. const packet = new Packet(toForward, broker)
  88. packet.qos = 0
  89. write(that, packet, function (err) {
  90. that._onError(err)
  91. cb() // don't pass the error here or it will be thrown by mqemitter
  92. })
  93. })
  94. } else {
  95. setImmediate(cb)
  96. }
  97. }
  98. this.deliverQoS = function deliverQoS (_packet, cb) {
  99. // downgrade to qos0 if requested by publish
  100. if (_packet.qos === 0) {
  101. that.deliver0(_packet, cb)
  102. return
  103. }
  104. const toForward = getToForwardPacket(_packet)
  105. if (toForward) {
  106. setImmediate(() => {
  107. const packet = new QoSPacket(toForward, that)
  108. // Downgrading to client subscription qos if needed
  109. const clientSub = that.subscriptions[packet.topic]
  110. if (clientSub && (clientSub.qos || 0) < packet.qos) {
  111. packet.qos = clientSub.qos
  112. }
  113. packet.writeCallback = cb
  114. if (that.clean || packet.retain) {
  115. writeQoS(null, that, packet)
  116. } else {
  117. broker.persistence.outgoingUpdate(that, packet, writeQoS)
  118. }
  119. })
  120. } else if (that.clean === false) {
  121. that.broker.persistence.outgoingClearMessageId(that, _packet, noop)
  122. // we consider this to be an error, since the packet is undefined
  123. // so there's nothing to send
  124. setImmediate(cb)
  125. } else {
  126. setImmediate(cb)
  127. }
  128. }
  129. this._keepaliveTimer = null
  130. this._keepaliveInterval = -1
  131. this._connectTimer = setTimeout(function () {
  132. that.emit('error', new Error('connect did not arrive in time'))
  133. }, broker.connectTimeout)
  134. }
  135. function dedupe (client, packet) {
  136. const id = packet.brokerId
  137. if (!id) {
  138. return true
  139. }
  140. const duplicates = client.duplicates
  141. const counter = packet.brokerCounter
  142. const result = (duplicates[id] || 0) < counter
  143. if (result) {
  144. duplicates[id] = counter
  145. }
  146. return result
  147. }
  148. function writeQoS (err, client, packet) {
  149. if (err) {
  150. // is this right, or we should ignore thins?
  151. client.emit('error', err)
  152. // don't pass the error here or it will be thrown by mqemitter
  153. packet.writeCallback()
  154. } else {
  155. write(client, packet, function (err) {
  156. if (err) {
  157. client.emit('error', err)
  158. }
  159. // don't pass the error here or it will be thrown by mqemitter
  160. packet.writeCallback()
  161. })
  162. }
  163. }
  164. function drainRequest (req) {
  165. req.callback()
  166. }
  167. function onError (err) {
  168. if (!err) return
  169. this.errored = true
  170. this.conn.removeAllListeners('error')
  171. this.conn.on('error', noop)
  172. // hack to clean up the write callbacks in case of error
  173. const state = this.conn._writableState
  174. const list = typeof state.getBuffer === 'function' ? state.getBuffer() : state.buffer
  175. list.forEach(drainRequest)
  176. this.broker.emit(this.id ? 'clientError' : 'connectionError', this, err)
  177. this.close()
  178. }
  179. util.inherits(Client, EventEmitter)
  180. Client.prototype._onError = onError
  181. Client.prototype.publish = function (message, done) {
  182. const packet = new Packet(message, this.broker)
  183. const that = this
  184. if (packet.qos === 0) {
  185. // skip offline and send it as it is
  186. this.deliver0(packet, done)
  187. return
  188. }
  189. if (!this.clean && this.id) {
  190. this.broker.persistence.outgoingEnqueue({
  191. clientId: this.id
  192. }, packet, function deliver (err) {
  193. if (err) {
  194. return done(err)
  195. }
  196. that.deliverQoS(packet, done)
  197. })
  198. } else {
  199. that.deliverQoS(packet, done)
  200. }
  201. }
  202. Client.prototype.subscribe = function (packet, done) {
  203. if (!packet.subscriptions) {
  204. if (!Array.isArray(packet)) {
  205. packet = [packet]
  206. }
  207. packet = {
  208. subscriptions: packet
  209. }
  210. }
  211. handleSubscribe(this, packet, false, done)
  212. }
  213. Client.prototype.unsubscribe = function (packet, done) {
  214. if (!packet.unsubscriptions) {
  215. if (!Array.isArray(packet)) {
  216. packet = [packet]
  217. }
  218. packet = {
  219. unsubscriptions: packet
  220. }
  221. }
  222. handleUnsubscribe(this, packet, done)
  223. }
  224. Client.prototype.close = function (done) {
  225. if (this.closed) {
  226. if (typeof done === 'function') {
  227. done()
  228. }
  229. return
  230. }
  231. const that = this
  232. const conn = this.conn
  233. this.closed = true
  234. this._parser.removeAllListeners('packet')
  235. conn.removeAllListeners('readable')
  236. this._parser._queue = null
  237. if (this._keepaliveTimer) {
  238. this._keepaliveTimer.clear()
  239. this._keepaliveInterval = -1
  240. this._keepaliveTimer = null
  241. }
  242. if (this._connectTimer) {
  243. clearTimeout(this._connectTimer)
  244. this._connectTimer = null
  245. }
  246. this._eos()
  247. this._eos = noop
  248. handleUnsubscribe(
  249. this,
  250. {
  251. unsubscriptions: Object.keys(this.subscriptions)
  252. },
  253. finish)
  254. function finish () {
  255. const will = that.will
  256. // _disconnected is set only if client is disconnected with a valid disconnect packet
  257. if (!that._disconnected && will) {
  258. that.broker.authorizePublish(that, will, function (err) {
  259. if (err) { return done() }
  260. that.broker.publish(will, that, done)
  261. function done () {
  262. that.broker.persistence.delWill({
  263. id: that.id,
  264. brokerId: that.broker.id
  265. }, noop)
  266. }
  267. })
  268. } else if (will) {
  269. // delete the persisted will even on clean disconnect https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349232
  270. that.broker.persistence.delWill({
  271. id: that.id,
  272. brokerId: that.broker.id
  273. }, noop)
  274. }
  275. that.will = null // this function might be called twice
  276. that._will = null
  277. that.connected = false
  278. that.connecting = false
  279. conn.removeAllListeners('error')
  280. conn.on('error', noop)
  281. if (that.broker.clients[that.id] && that._authorized) {
  282. that.broker.unregisterClient(that)
  283. }
  284. // clear up the drain event listeners
  285. that.conn.emit('drain')
  286. that.conn.removeAllListeners('drain')
  287. conn.destroy()
  288. if (typeof done === 'function') {
  289. done()
  290. }
  291. }
  292. }
  293. Client.prototype.pause = function () {
  294. this._paused = true
  295. }
  296. Client.prototype.resume = function () {
  297. this._paused = false
  298. this._nextBatch()
  299. }
  300. function enqueue (packet) {
  301. const client = this.client
  302. client._parsingBatch++
  303. // already connected or it's the first packet
  304. if (client.connackSent || client._parsingBatch === 1) {
  305. handle(client, packet, client._nextBatch)
  306. } else {
  307. if (this._queue.length < client.broker.queueLimit) {
  308. this._queue.push(packet)
  309. } else {
  310. this.emit('error', new Error('Client queue limit reached'))
  311. }
  312. }
  313. }
  314. function dequeue () {
  315. const q = this._parser._queue
  316. if (q) {
  317. for (let i = 0, len = q.length; i < len; i++) {
  318. handle(this, q[i], this._nextBatch)
  319. }
  320. this._parser._queue = null
  321. }
  322. }
  323. Client.prototype.emptyOutgoingQueue = function (done) {
  324. const client = this
  325. const persistence = client.broker.persistence
  326. function filter (packet, enc, next) {
  327. persistence.outgoingClearMessageId(client, packet, next)
  328. }
  329. pipeline(
  330. persistence.outgoingStream(client),
  331. through(filter),
  332. done
  333. )
  334. }
  335. function noop () {}