| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601 |
- 'use strict'
- const { test } = require('tap')
- const memory = require('aedes-persistence')
- const Faketimers = require('@sinonjs/fake-timers')
- const { setup, connect, noError } = require('./helper')
- const aedes = require('../')
- function willConnect (s, opts, connected) {
- opts = opts || {}
- opts.will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: false
- }
- return connect(s, opts, connected)
- }
- test('delivers a will', function (t) {
- t.plan(4)
- const opts = {}
- // willConnect populates opts with a will
- const s = willConnect(setup(),
- opts,
- function () {
- s.conn.destroy()
- }
- )
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.mq.on('mywill', function (packet, cb) {
- t.equal(packet.topic, opts.will.topic, 'topic matches')
- t.same(packet.payload, opts.will.payload, 'payload matches')
- t.equal(packet.qos, opts.will.qos, 'qos matches')
- t.equal(packet.retain, opts.will.retain, 'retain matches')
- cb()
- })
- })
- test('calling close two times should not deliver two wills', function (t) {
- t.plan(4)
- const opts = {}
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- broker.on('client', function (client) {
- client.close()
- client.close()
- })
- broker.mq.on('mywill', onWill)
- // willConnect populates opts with a will
- willConnect(setup(broker), opts)
- function onWill (packet, cb) {
- broker.mq.removeListener('mywill', onWill, function () {
- broker.mq.on('mywill', function (packet) {
- t.fail('the will must be delivered only once')
- })
- })
- t.equal(packet.topic, opts.will.topic, 'topic matches')
- t.same(packet.payload, opts.will.payload, 'payload matches')
- t.equal(packet.qos, opts.will.qos, 'qos matches')
- t.equal(packet.retain, opts.will.retain, 'retain matches')
- cb()
- }
- })
- test('delivers old will in case of a crash', function (t) {
- t.plan(8)
- const persistence = memory()
- const will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: false
- }
- persistence.broker = {
- id: 'anotherBroker'
- }
- persistence.putWill({
- id: 'myClientId42'
- }, will, function (err) {
- t.error(err, 'no error')
- let authorized = false
- const interval = 10 // ms, so that the will check happens fast!
- const broker = aedes({
- persistence,
- heartbeatInterval: interval,
- authorizePublish: function (client, packet, callback) {
- t.strictSame(client, null, 'client must be null')
- authorized = true
- callback(null)
- }
- })
- t.teardown(broker.close.bind(broker))
- const start = Date.now()
- broker.mq.on('mywill', check)
- function check (packet, cb) {
- broker.mq.removeListener('mywill', check, function () {
- broker.mq.on('mywill', function (packet) {
- t.fail('the will must be delivered only once')
- })
- })
- t.ok(Date.now() - start >= 3 * interval, 'the will needs to be emitted after 3 heartbeats')
- t.equal(packet.topic, will.topic, 'topic matches')
- t.same(packet.payload, will.payload, 'payload matches')
- t.equal(packet.qos, will.qos, 'qos matches')
- t.equal(packet.retain, will.retain, 'retain matches')
- t.equal(authorized, true, 'authorization called')
- cb()
- }
- })
- })
- test('deliver old will without authorization in case of a crash', function (t) {
- t.plan(2)
- const persistence = memory()
- const will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: false
- }
- persistence.broker = {
- id: 'anotherBroker'
- }
- persistence.putWill({
- id: 'myClientId42'
- }, will, function (err) {
- t.error(err, 'no error')
- const interval = 10 // ms, so that the will check happens fast!
- const broker = aedes({
- persistence,
- heartbeatInterval: interval,
- authorizePublish: function (client, packet, callback) {
- t.strictSame(client, null, 'client must be null')
- callback(new Error())
- }
- })
- t.teardown(broker.close.bind(broker))
- broker.mq.on('mywill', check)
- function check (packet, cb) {
- t.fail('received will without authorization')
- cb()
- }
- })
- })
- test('delete old broker', function (t) {
- t.plan(1)
- const clock = Faketimers.install()
- const heartbeatInterval = 100
- const broker = aedes({
- heartbeatInterval
- })
- t.teardown(broker.close.bind(broker))
- const brokerId = 'dummyBroker'
- broker.brokers[brokerId] = Date.now() - heartbeatInterval * 3.5
- setTimeout(() => {
- t.equal(broker.brokers[brokerId], undefined, 'Broker deleted')
- }, heartbeatInterval * 4)
- clock.tick(heartbeatInterval * 4)
- clock.uninstall()
- })
- test('store the will in the persistence', function (t) {
- t.plan(5)
- const opts = {
- clientId: 'abcde'
- }
- // willConnect populates opts with a will
- const s = willConnect(setup(), opts)
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('client', function () {
- // this is connack
- s.broker.persistence.getWill({
- id: opts.clientId
- }, function (err, packet) {
- t.error(err, 'no error')
- t.same(packet.topic, opts.will.topic, 'will topic matches')
- t.same(packet.payload, opts.will.payload, 'will payload matches')
- t.same(packet.qos, opts.will.qos, 'will qos matches')
- t.same(packet.retain, opts.will.retain, 'will retain matches')
- })
- })
- })
- test('delete the will in the persistence after publish', function (t) {
- t.plan(2)
- const opts = {
- clientId: 'abcde'
- }
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- broker.on('client', function (client) {
- setImmediate(function () {
- client.close()
- })
- })
- broker.mq.on('mywill', check)
- // willConnect populates opts with a will
- willConnect(setup(broker), opts)
- function check (packet, cb) {
- broker.mq.removeListener('mywill', check, function () {
- broker.persistence.getWill({
- id: opts.clientId
- }, function (err, p) {
- t.error(err, 'no error')
- t.notOk(p, 'packet is empty')
- })
- })
- cb()
- }
- })
- test('delivers a will with authorization', function (t) {
- t.plan(6)
- let authorized = false
- const opts = {}
- // willConnect populates opts with a will
- const s = willConnect(
- setup(aedes({
- authorizePublish: (client, packet, callback) => {
- authorized = true
- callback(null)
- }
- })),
- opts,
- function () {
- s.conn.destroy()
- }
- )
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientDisconnect', function (client) {
- t.equal(client.connected, false)
- })
- s.broker.mq.on('mywill', function (packet, cb) {
- t.equal(packet.topic, opts.will.topic, 'topic matches')
- t.same(packet.payload, opts.will.payload, 'payload matches')
- t.equal(packet.qos, opts.will.qos, 'qos matches')
- t.equal(packet.retain, opts.will.retain, 'retain matches')
- t.equal(authorized, true, 'authorization called')
- cb()
- })
- })
- test('delivers a will waits for authorization', function (t) {
- t.plan(6)
- let authorized = false
- const opts = {}
- // willConnect populates opts with a will
- const s = willConnect(
- setup(aedes({
- authorizePublish: (client, packet, callback) => {
- authorized = true
- setImmediate(() => { callback(null) })
- }
- })),
- opts,
- function () {
- s.conn.destroy()
- }
- )
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientDisconnect', function () {
- t.pass('client is disconnected')
- })
- s.broker.mq.on('mywill', function (packet, cb) {
- t.equal(packet.topic, opts.will.topic, 'topic matches')
- t.same(packet.payload, opts.will.payload, 'payload matches')
- t.equal(packet.qos, opts.will.qos, 'qos matches')
- t.equal(packet.retain, opts.will.retain, 'retain matches')
- t.equal(authorized, true, 'authorization called')
- cb()
- })
- })
- test('does not deliver a will without authorization', function (t) {
- t.plan(1)
- let authorized = false
- const opts = {}
- // willConnect populates opts with a will
- const s = willConnect(
- setup(aedes({
- authorizePublish: (username, packet, callback) => {
- authorized = true
- callback(new Error())
- }
- })),
- opts,
- function () {
- s.conn.destroy()
- }
- )
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientDisconnect', function () {
- t.equal(authorized, true, 'authorization called')
- })
- s.broker.mq.on('mywill', function (packet, cb) {
- t.fail('received will without authorization')
- cb()
- })
- })
- test('does not deliver a will without authentication', function (t) {
- t.plan(1)
- let authenticated = false
- const opts = {}
- // willConnect populates opts with a will
- const s = willConnect(
- setup(aedes({
- authenticate: (client, username, password, callback) => {
- authenticated = true
- callback(new Error(), false)
- }
- })),
- opts
- )
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientError', function () {
- t.equal(authenticated, true, 'authentication called')
- t.end()
- })
- s.broker.mq.on('mywill', function (packet, cb) {
- t.fail('received will without authentication')
- cb()
- })
- })
- test('does not deliver will if broker is closed during authentication', function (t) {
- t.plan(0)
- const opts = { keepalive: 1 }
- const broker = aedes({
- authenticate: function (client, username, password, callback) {
- setTimeout(function () {
- callback(null, true)
- })
- broker.close()
- }
- })
- broker.on('keepaliveTimeout', function () {
- t.fail('keepalive timer shoud not be set')
- })
- broker.mq.on('mywill', function (packet, cb) {
- t.fail('Received will when it was not expected')
- cb()
- })
- willConnect(setup(broker), opts)
- })
- // [MQTT-3.14.4-3]
- test('does not deliver will when client sends a DISCONNECT', function (t) {
- t.plan(0)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const s = noError(willConnect(setup(broker), {}, function () {
- s.inStream.end({
- cmd: 'disconnect'
- })
- }), t)
- s.broker.mq.on('mywill', function (packet, cb) {
- t.fail(packet)
- cb()
- })
- })
- test('deletes from persistence on DISCONNECT', function (t) {
- t.plan(2)
- const opts = {
- clientId: 'abcde'
- }
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const s = noError(willConnect(setup(broker), opts, function () {
- s.inStream.end({
- cmd: 'disconnect'
- })
- }), t)
- s.broker.persistence.getWill({
- id: opts.clientId
- }, function (err, packet) {
- t.error(err, 'no error')
- t.notOk(packet)
- })
- })
- test('does not store multiple will with same clientid', function (t) {
- t.plan(4)
- const opts = { clientId: 'abcde' }
- const broker = aedes()
- let s = noError(willConnect(setup(broker), opts, function () {
- // gracefully close client so no will is sent
- s.inStream.end({
- cmd: 'disconnect'
- })
- }), t)
- broker.on('clientDisconnect', function (client) {
- // reconnect same client with will
- s = willConnect(setup(broker), opts, function () {
- // check that there are not 2 will messages for the same clientid
- s.broker.persistence.delWill({ id: opts.clientId }, function (err, packet) {
- t.error(err, 'no error')
- t.equal(packet.clientId, opts.clientId, 'will packet found')
- s.broker.persistence.delWill({ id: opts.clientId }, function (err, packet) {
- t.error(err, 'no error')
- t.equal(!!packet, false, 'no duplicated packets')
- broker.close()
- })
- })
- })
- })
- })
- test('don\'t delivers a will if broker alive', function (t) {
- const persistence = memory()
- const will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: false
- }
- const oldBroker = 'broker1'
- persistence.broker = {
- id: oldBroker
- }
- persistence.putWill({
- id: 'myClientId42'
- }, will, function (err) {
- t.error(err, 'no error')
- const opts = {
- persistence,
- heartbeatInterval: 10
- }
- let count = 0
- const broker = aedes(opts)
- t.teardown(broker.close.bind(broker))
- const streamWill = persistence.streamWill
- persistence.streamWill = function () {
- // don't pass broker.brokers to streamWill
- return streamWill.call(persistence)
- }
- broker.mq.on('mywill', function (packet, cb) {
- t.fail('Will received')
- cb()
- })
- broker.mq.on('$SYS/+/heartbeat', function () {
- t.pass('Heartbeat received')
- broker.brokers[oldBroker] = Date.now()
- if (++count === 5) {
- t.end()
- }
- })
- })
- })
- test('handle will publish error', function (t) {
- t.plan(2)
- const persistence = memory()
- const will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: false
- }
- persistence.broker = {
- id: 'broker1'
- }
- persistence.putWill({
- id: 'myClientId42'
- }, will, function (err) {
- t.error(err, 'no error')
- const opts = {
- persistence,
- heartbeatInterval: 10
- }
- persistence.delWill = function (client, cb) {
- cb(new Error('Throws error'))
- }
- const broker = aedes(opts)
- t.teardown(broker.close.bind(broker))
- broker.once('error', function (err) {
- t.equal('Throws error', err.message, 'throws error')
- })
- })
- })
- test('handle will publish error 2', function (t) {
- t.plan(2)
- const persistence = memory()
- const will = {
- topic: 'mywill',
- payload: Buffer.from('last will'),
- qos: 0,
- retain: true
- }
- persistence.broker = {
- id: 'broker1'
- }
- persistence.putWill({
- id: 'myClientId42'
- }, will, function (err) {
- t.error(err, 'no error')
- const opts = {
- persistence,
- heartbeatInterval: 10
- }
- persistence.storeRetained = function (packet, cb) {
- cb(new Error('Throws error'))
- }
- const broker = aedes(opts)
- t.teardown(broker.close.bind(broker))
- broker.once('error', function (err) {
- t.equal('Throws error', err.message, 'throws error')
- })
- })
- })
|