| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- 'use strict'
- const EventEmitter = require('events')
- const util = require('util')
- const parallel = require('fastparallel')
- const series = require('fastseries')
- const { v4: uuidv4 } = require('uuid')
- const reusify = require('reusify')
- const { pipeline } = require('stream')
- const Packet = require('aedes-packet')
- const memory = require('aedes-persistence')
- const mqemitter = require('mqemitter')
- const Client = require('./lib/client')
- const { $SYS_PREFIX, bulk } = require('./lib/utils')
- module.exports = Aedes.createBroker = Aedes
- const defaultOptions = {
- concurrency: 100,
- heartbeatInterval: 60000, // 1 minute
- connectTimeout: 30000, // 30 secs
- decodeProtocol: null,
- preConnect: defaultPreConnect,
- authenticate: defaultAuthenticate,
- authorizePublish: defaultAuthorizePublish,
- authorizeSubscribe: defaultAuthorizeSubscribe,
- authorizeForward: defaultAuthorizeForward,
- published: defaultPublished,
- trustProxy: false,
- trustedProxies: [],
- queueLimit: 42,
- maxClientsIdLength: 23,
- keepaliveLimit: 0
- }
- function Aedes (opts) {
- const that = this
- if (!(this instanceof Aedes)) {
- return new Aedes(opts)
- }
- opts = Object.assign({}, defaultOptions, opts)
- this.id = opts.id || uuidv4()
- // +1 when construct a new aedes-packet
- // internal track for last brokerCounter
- this.counter = 0
- this.queueLimit = opts.queueLimit
- this.connectTimeout = opts.connectTimeout
- this.keepaliveLimit = opts.keepaliveLimit
- this.maxClientsIdLength = opts.maxClientsIdLength
- this.mq = opts.mq || mqemitter({
- concurrency: opts.concurrency,
- matchEmptyLevels: true // [MQTT-4.7.1-3]
- })
- this.handle = function handle (conn, req) {
- conn.setMaxListeners(opts.concurrency * 2)
- // create a new Client instance for a new connection
- // return, just to please standard
- return new Client(that, conn, req)
- }
- this.persistence = opts.persistence || memory()
- this.persistence.broker = this
- this._parallel = parallel()
- this._series = series()
- this._enqueuers = reusify(DoEnqueues)
- this.preConnect = opts.preConnect
- this.authenticate = opts.authenticate
- this.authorizePublish = opts.authorizePublish
- this.authorizeSubscribe = opts.authorizeSubscribe
- this.authorizeForward = opts.authorizeForward
- this.published = opts.published
- this.decodeProtocol = opts.decodeProtocol
- this.trustProxy = opts.trustProxy
- this.trustedProxies = opts.trustedProxies
- this.clients = {}
- this.brokers = {}
- const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
- const birthTopic = $SYS_PREFIX + that.id + '/birth'
- this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)
- const bufId = Buffer.from(that.id, 'utf8')
- // in a cluster env this is used to warn other broker instances
- // that this broker is alive
- that.publish({
- topic: birthTopic,
- payload: bufId
- }, noop)
- function heartbeat () {
- that.publish({
- topic: heartbeatTopic,
- payload: bufId
- }, noop)
- }
- function deleteOldBrokers (broker) {
- if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
- delete that.brokers[broker]
- }
- }
- this._clearWillInterval = setInterval(function () {
- Object.keys(that.brokers).forEach(deleteOldBrokers)
- pipeline(
- that.persistence.streamWill(that.brokers),
- bulk(receiveWills),
- function done (err) {
- if (err) {
- that.emit('error', err)
- }
- }
- )
- }, opts.heartbeatInterval * 4)
- function receiveWills (chunks, done) {
- that._parallel(that, checkAndPublish, chunks, done)
- }
- function checkAndPublish (will, done) {
- const notPublish =
- that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()
- if (notPublish) return done()
- // randomize this, so that multiple brokers
- // do not publish the same wills at the same time
- this.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
- if (err) { return doneWill() }
- that.publish(will, doneWill)
- function doneWill (err) {
- if (err) { return done(err) }
- that.persistence.delWill({
- id: will.clientId,
- brokerId: will.brokerId
- }, done)
- }
- })
- }
- this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
- that.brokers[packet.payload.toString()] = Date.now()
- done()
- })
- this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) {
- const brokerId = packet.payload.toString()
- // reset duplicates counter
- if (brokerId !== that.id) {
- for (const clientId in that.clients) {
- delete that.clients[clientId].duplicates[brokerId]
- }
- }
- done()
- })
- this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) {
- const serverId = packet.topic.split('/')[1]
- const clientId = packet.payload.toString()
- if (that.clients[clientId] && serverId !== that.id) {
- if (that.clients[clientId].closed) {
- // remove the client from the list if it is already closed
- that.deleteClient(clientId)
- done()
- } else {
- that.clients[clientId].close(done)
- }
- } else {
- done()
- }
- })
- // metadata
- this.connectedClients = 0
- this.closed = false
- }
- util.inherits(Aedes, EventEmitter)
- function storeRetained (packet, done) {
- if (packet.retain) {
- this.broker.persistence.storeRetained(packet, done)
- } else {
- done()
- }
- }
- function emitPacket (packet, done) {
- if (this.client) packet.clientId = this.client.id
- this.broker.mq.emit(packet, done)
- }
- function enqueueOffline (packet, done) {
- const enqueuer = this.broker._enqueuers.get()
- enqueuer.complete = done
- enqueuer.packet = packet
- enqueuer.topic = packet.topic
- enqueuer.broker = this.broker
- this.broker.persistence.subscriptionsByTopic(
- packet.topic,
- enqueuer.done
- )
- }
- function DoEnqueues () {
- this.next = null
- this.complete = null
- this.packet = null
- this.topic = null
- this.broker = null
- const that = this
- this.done = function doneEnqueue (err, subs) {
- const broker = that.broker
- if (err) {
- // is this really recoverable?
- // let's just error the whole aedes
- // https://nodejs.org/api/events.html#events_error_events
- broker.emit('error', err)
- return
- }
- if (that.topic.indexOf($SYS_PREFIX) === 0) {
- subs = subs.filter(removeSharp)
- }
- const packet = that.packet
- const complete = that.complete
- that.packet = null
- that.complete = null
- that.topic = null
- broker.persistence.outgoingEnqueueCombi(subs, packet, complete)
- broker._enqueuers.release(that)
- }
- }
- // + is 43
- // # is 35
- function removeSharp (sub) {
- const code = sub.topic.charCodeAt(0)
- return code !== 43 && code !== 35
- }
- function callPublished (_, done) {
- this.broker.published(this.packet, this.client, done)
- this.broker.emit('publish', this.packet, this.client)
- }
- const publishFuncsSimple = [
- storeRetained,
- emitPacket,
- callPublished
- ]
- const publishFuncsQoS = [
- storeRetained,
- enqueueOffline,
- emitPacket,
- callPublished
- ]
- Aedes.prototype.publish = function (packet, client, done) {
- if (typeof client === 'function') {
- done = client
- client = null
- }
- const p = new Packet(packet, this)
- const publishFuncs = p.qos > 0 ? publishFuncsQoS : publishFuncsSimple
- this._series(new PublishState(this, client, packet), publishFuncs, p, done)
- }
- Aedes.prototype.subscribe = function (topic, func, done) {
- this.mq.on(topic, func, done)
- }
- Aedes.prototype.unsubscribe = function (topic, func, done) {
- this.mq.removeListener(topic, func, done)
- }
- Aedes.prototype.registerClient = function (client) {
- const that = this
- if (this.clients[client.id]) {
- // [MQTT-3.1.4-2]
- this.clients[client.id].close(function closeClient () {
- that._finishRegisterClient(client)
- })
- } else {
- this._finishRegisterClient(client)
- }
- }
- Aedes.prototype._finishRegisterClient = function (client) {
- this.connectedClients++
- this.clients[client.id] = client
- this.emit('client', client)
- this.publish({
- topic: $SYS_PREFIX + this.id + '/new/clients',
- payload: Buffer.from(client.id, 'utf8')
- }, noop)
- }
- Aedes.prototype.unregisterClient = function (client) {
- this.deleteClient(client.id)
- this.emit('clientDisconnect', client)
- this.publish({
- topic: $SYS_PREFIX + this.id + '/disconnect/clients',
- payload: Buffer.from(client.id, 'utf8')
- }, noop)
- }
- Aedes.prototype.deleteClient = function (clientId) {
- this.connectedClients--
- delete this.clients[clientId]
- }
- function closeClient (client, cb) {
- this.clients[client].close(cb)
- }
- Aedes.prototype.close = function (cb = noop) {
- const that = this
- if (this.closed) {
- return cb()
- }
- this.closed = true
- clearInterval(this._heartbeatInterval)
- clearInterval(this._clearWillInterval)
- this._parallel(this, closeClient, Object.keys(this.clients), doneClose)
- function doneClose () {
- that.emit('closed')
- that.mq.close(cb)
- }
- }
- Aedes.prototype.version = require('./package.json').version
- function defaultPreConnect (client, packet, callback) {
- callback(null, true)
- }
- function defaultAuthenticate (client, username, password, callback) {
- callback(null, true)
- }
- function defaultAuthorizePublish (client, packet, callback) {
- if (packet.topic.startsWith($SYS_PREFIX)) {
- return callback(new Error($SYS_PREFIX + ' topic is reserved'))
- }
- callback(null)
- }
- function defaultAuthorizeSubscribe (client, sub, callback) {
- callback(null, sub)
- }
- function defaultAuthorizeForward (client, packet) {
- return packet
- }
- function defaultPublished (packet, client, callback) {
- callback(null)
- }
- function PublishState (broker, client, packet) {
- this.broker = broker
- this.client = client
- this.packet = packet
- }
- function noop () {}
|