| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865 |
- 'use strict'
- const { test } = require('tap')
- const eos = require('end-of-stream')
- const { setup, connect, subscribe, subscribeMultiple, noError } = require('./helper')
- const aedes = require('../')
- const proxyquire = require('proxyquire')
- test('test aedes.createBroker', function (t) {
- t.plan(1)
- const broker = aedes.createBroker()
- t.teardown(broker.close.bind(broker))
- connect(setup(broker), {}, function () {
- t.pass('connected')
- })
- })
- test('publish QoS 0', function (t) {
- t.plan(2)
- const s = connect(setup(), { clientId: 'my-client-xyz-5' })
- t.teardown(s.broker.close.bind(s.broker))
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 0,
- retain: false,
- dup: false,
- clientId: 'my-client-xyz-5'
- }
- s.broker.mq.on('hello', function (packet, cb) {
- expected.brokerId = s.broker.id
- expected.brokerCounter = s.broker.counter
- t.equal(packet.messageId, undefined, 'MUST not contain a packet identifier in QoS 0')
- t.same(packet, expected, 'packet matches')
- cb()
- })
- s.inStream.write({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- })
- })
- test('messageId shoud reset to 1 if it reached 65535', function (t) {
- t.plan(7)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- const publishPacket = {
- cmd: 'publish',
- topic: 'hello',
- payload: 'world',
- qos: 1,
- messageId: 42
- }
- let count = 0
- s.broker.on('clientReady', function (client) {
- subscribe(t, s, 'hello', 1, function () {
- client._nextId = 65535
- s.outStream.on('data', function (packet) {
- if (packet.cmd === 'puback') {
- t.equal(packet.messageId, 42)
- }
- if (packet.cmd === 'publish') {
- t.equal(packet.messageId, count++ === 0 ? 65535 : 1)
- }
- })
- s.inStream.write(publishPacket)
- s.inStream.write(publishPacket)
- })
- })
- })
- test('publish empty topic throws error', function (t) {
- t.plan(1)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'publish',
- topic: '',
- payload: 'world'
- })
- s.broker.on('clientError', function (client, err) {
- t.pass('should emit error')
- })
- })
- test('publish to $SYS topic throws error', function (t) {
- t.plan(1)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'publish',
- topic: '$SYS/not/allowed',
- payload: 'world'
- })
- s.broker.on('clientError', function (client, err) {
- t.pass('should emit error')
- })
- })
- ;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) {
- test('subscribe a single topic in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) {
- t.plan(5)
- const s = connect(setup(), { clean: ele.clean })
- t.teardown(s.broker.close.bind(s.broker))
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- dup: false,
- length: 12,
- qos: 0,
- retain: false
- }
- const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }]
- subscribe(t, s, 'hello', ele.qos, function () {
- s.outStream.once('data', function (packet) {
- t.same(packet, expected, 'packet matches')
- })
- s.broker.persistence.subscriptionsByClient(s.client, function (_, subs) {
- t.same(subs, expectedSubs)
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- })
- })
- })
- })
- // Catch invalid packet writeToStream errors
- test('return write errors to callback', function (t) {
- t.plan(1)
- const write = proxyquire('../lib/write.js', {
- 'mqtt-packet': {
- writeToStream: () => {
- throw Error('error')
- }
- }
- })
- const client = {
- conn: {
- writable: true
- },
- connecting: true
- }
- write(client, {}, function (err) {
- t.equal(err.message, 'packet received not valid', 'should return the error to callback')
- })
- })
- ;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) {
- test('subscribe multipe topics in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) {
- t.plan(5)
- const s = connect(setup(), { clean: ele.clean })
- t.teardown(s.broker.close.bind(s.broker))
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- dup: false,
- length: 12,
- qos: 0,
- retain: false
- }
- const subs = [
- { topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined },
- { topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }
- ]
- const expectedSubs = ele.clean ? null : subs
- subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () {
- s.outStream.on('data', function (packet) {
- t.same(packet, expected, 'packet matches')
- })
- s.broker.persistence.subscriptionsByClient(s.client, function (_, saveSubs) {
- t.same(saveSubs, expectedSubs)
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- })
- })
- })
- })
- test('does not die badly on connection error', function (t) {
- t.plan(3)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'subscribe',
- messageId: 42,
- subscriptions: [{
- topic: 'hello',
- qos: 0
- }]
- })
- s.broker.on('clientError', function (client, err) {
- t.ok(client, 'client is passed')
- t.ok(err, 'err is passed')
- })
- s.outStream.on('data', function (packet) {
- s.conn.destroy()
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world')
- }, function () {
- t.pass('calls the callback')
- })
- })
- })
- // Guarded in mqtt-packet
- test('subscribe should have messageId', function (t) {
- t.plan(1)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'subscribe',
- subscriptions: [{
- topic: 'hello',
- qos: 0
- }]
- })
- s.broker.on('connectionError', function (client, err) {
- t.ok(err.message, 'Invalid messageId')
- })
- })
- test('subscribe with messageId 0 should return suback', function (t) {
- t.plan(1)
- const s = connect(setup())
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'subscribe',
- subscriptions: [{
- topic: 'hello',
- qos: 0
- }],
- messageId: 0
- })
- s.outStream.once('data', function (packet) {
- t.same(packet, {
- cmd: 'suback',
- messageId: 0,
- dup: false,
- length: 3,
- qos: 0,
- retain: false,
- granted: [
- 0
- ]
- }, 'packet matches')
- })
- })
- test('unsubscribe', function (t) {
- t.plan(5)
- const s = noError(connect(setup()), t)
- t.teardown(s.broker.close.bind(s.broker))
- subscribe(t, s, 'hello', 0, function () {
- s.inStream.write({
- cmd: 'unsubscribe',
- messageId: 43,
- unsubscriptions: ['hello']
- })
- s.outStream.once('data', function (packet) {
- t.same(packet, {
- cmd: 'unsuback',
- messageId: 43,
- dup: false,
- length: 2,
- qos: 0,
- retain: false
- }, 'packet matches')
- s.outStream.on('data', function (packet) {
- t.fail('packet received')
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- }, function () {
- t.pass('publish finished')
- })
- })
- })
- })
- test('unsubscribe without subscribe', function (t) {
- t.plan(1)
- const s = noError(connect(setup()), t)
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'unsubscribe',
- messageId: 43,
- unsubscriptions: ['hello']
- })
- s.outStream.once('data', function (packet) {
- t.same(packet, {
- cmd: 'unsuback',
- messageId: 43,
- dup: false,
- length: 2,
- qos: 0,
- retain: false
- }, 'packet matches')
- })
- })
- test('unsubscribe on disconnect for a clean=true client', function (t) {
- t.plan(6)
- const opts = { clean: true }
- const s = connect(setup(), opts)
- t.teardown(s.broker.close.bind(s.broker))
- subscribe(t, s, 'hello', 0, function () {
- s.conn.destroy(null, function () {
- t.pass('closed streams')
- })
- s.outStream.on('data', function () {
- t.fail('should not receive any more messages')
- })
- s.broker.once('unsubscribe', function () {
- t.pass('should emit unsubscribe')
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world')
- }, function () {
- t.pass('calls the callback')
- })
- })
- })
- test('unsubscribe on disconnect for a clean=false client', function (t) {
- t.plan(5)
- const opts = { clean: false }
- const s = connect(setup(), opts)
- t.teardown(s.broker.close.bind(s.broker))
- subscribe(t, s, 'hello', 0, function () {
- s.conn.destroy(null, function () {
- t.pass('closed streams')
- })
- s.outStream.on('data', function () {
- t.fail('should not receive any more messages')
- })
- s.broker.once('unsubscribe', function () {
- t.fail('should not emit unsubscribe')
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world')
- }, function () {
- t.pass('calls the callback')
- })
- })
- })
- test('disconnect', function (t) {
- t.plan(1)
- const s = noError(connect(setup()), t)
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientDisconnect', function () {
- t.pass('closed stream')
- })
- s.inStream.write({
- cmd: 'disconnect'
- })
- })
- test('disconnect client on wrong cmd', function (t) {
- t.plan(1)
- const s = noError(connect(setup()), t)
- t.teardown(s.broker.close.bind(s.broker))
- s.broker.on('clientDisconnect', function () {
- t.pass('closed stream')
- })
- s.broker.on('clientReady', function (c) {
- // don't use stream write here because it will throw an error on mqtt_packet genetete
- c._parser.emit('packet', { cmd: 'pippo' })
- })
- })
- test('client closes', function (t) {
- t.plan(5)
- const broker = aedes()
- const client = noError(connect(setup(broker), { clientId: 'abcde' }))
- broker.on('clientReady', function () {
- const brokerClient = broker.clients.abcde
- t.equal(brokerClient.connected, true, 'client connected')
- eos(client.conn, t.pass.bind(t, 'client closes'))
- setImmediate(() => {
- brokerClient.close(function () {
- t.equal(broker.clients.abcde, undefined, 'client instance is removed')
- })
- t.equal(brokerClient.connected, false, 'client disconnected')
- broker.close(function (err) {
- t.error(err, 'no error')
- })
- })
- })
- })
- test('broker closes', function (t) {
- t.plan(4)
- const broker = aedes()
- const client = noError(connect(setup(broker), {
- clientId: 'abcde'
- }, function () {
- eos(client.conn, t.pass.bind(t, 'client closes'))
- broker.close(function (err) {
- t.error(err, 'no error')
- t.ok(broker.closed)
- t.equal(broker.clients.abcde, undefined, 'client instance is removed')
- })
- }))
- })
- test('broker closes gracefully', function (t) {
- t.plan(7)
- const broker = aedes()
- const client1 = noError(connect(setup(broker), {
- }, function () {
- const client2 = noError(connect(setup(broker), {
- }, function () {
- t.equal(broker.connectedClients, 2, '2 connected clients')
- eos(client1.conn, t.pass.bind(t, 'client1 closes'))
- eos(client2.conn, t.pass.bind(t, 'client2 closes'))
- broker.close(function (err) {
- t.error(err, 'no error')
- t.ok(broker.mq.closed, 'broker mq closes')
- t.ok(broker.closed, 'broker closes')
- t.equal(broker.connectedClients, 0, 'no connected clients')
- })
- }))
- }))
- })
- test('testing other event', function (t) {
- t.plan(1)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const client = setup(broker)
- broker.on('connectionError', function (client, error) {
- t.notOk(client.id, null)
- })
- client.conn.emit('error', 'Connect not yet arrived')
- })
- test('connect without a clientId for MQTT 3.1.1', function (t) {
- t.plan(1)
- const s = setup()
- t.teardown(s.broker.close.bind(s.broker))
- s.inStream.write({
- cmd: 'connect',
- protocolId: 'MQTT',
- protocolVersion: 4,
- clean: true,
- keepalive: 0
- })
- s.outStream.on('data', function (packet) {
- t.same(packet, {
- cmd: 'connack',
- returnCode: 0,
- length: 2,
- qos: 0,
- retain: false,
- dup: false,
- topic: null,
- payload: null,
- sessionPresent: false
- }, 'successful connack')
- })
- })
- test('disconnect existing client with the same clientId', function (t) {
- t.plan(2)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const c1 = connect(setup(broker), {
- clientId: 'abcde'
- }, function () {
- eos(c1.conn, function () {
- t.pass('first client disconnected')
- })
- connect(setup(broker), {
- clientId: 'abcde'
- }, function () {
- t.pass('second client connected')
- })
- })
- })
- test('disconnect if another broker connects the same clientId', function (t) {
- t.plan(2)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const c1 = connect(setup(broker), {
- clientId: 'abcde'
- }, function () {
- eos(c1.conn, function () {
- t.pass('disconnect first client')
- })
- broker.publish({
- topic: '$SYS/anotherBroker/new/clients',
- payload: Buffer.from('abcde')
- }, function () {
- t.pass('second client connects to another broker')
- })
- })
- })
- test('publish to $SYS/broker/new/clients', function (t) {
- t.plan(1)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- broker.mq.on('$SYS/' + broker.id + '/new/clients', function (packet, done) {
- t.equal(packet.payload.toString(), 'abcde', 'clientId matches')
- done()
- })
- connect(setup(broker), {
- clientId: 'abcde'
- })
- })
- test('publish to $SYS/broker/new/subsribers and $SYS/broker/new/unsubsribers', function (t) {
- t.plan(7)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const sub = {
- topic: 'hello',
- qos: 0
- }
- broker.mq.on('$SYS/' + broker.id + '/new/subscribes', function (packet, done) {
- const payload = JSON.parse(packet.payload.toString())
- t.equal(payload.clientId, 'abcde', 'clientId matches')
- t.same(payload.subs, [sub], 'subscriptions matches')
- done()
- })
- broker.mq.on('$SYS/' + broker.id + '/new/unsubscribes', function (packet, done) {
- const payload = JSON.parse(packet.payload.toString())
- t.equal(payload.clientId, 'abcde', 'clientId matches')
- t.same(payload.subs, [sub.topic], 'unsubscriptions matches')
- done()
- })
- const subscriber = connect(setup(broker), {
- clean: false, clientId: 'abcde'
- }, function () {
- subscribe(t, subscriber, sub.topic, sub.qos, function () {
- subscriber.inStream.write({
- cmd: 'unsubscribe',
- messageId: 43,
- unsubscriptions: ['hello']
- })
- })
- })
- })
- test('restore QoS 0 subscriptions not clean', function (t) {
- t.plan(5)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 0,
- dup: false,
- length: 12,
- retain: false
- }
- let subscriber = connect(setup(broker), {
- clean: false, clientId: 'abcde'
- }, function () {
- subscribe(t, subscriber, 'hello', 0, function () {
- subscriber.inStream.end()
- const publisher = connect(setup(broker), {
- }, function () {
- subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
- t.equal(connect.sessionPresent, true, 'session present is set to true')
- publisher.inStream.write({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world',
- qos: 0
- })
- })
- subscriber.outStream.once('data', function (packet) {
- t.same(packet, expected, 'packet must match')
- })
- })
- })
- })
- })
- test('do not restore QoS 0 subscriptions when clean', function (t) {
- t.plan(5)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- let subscriber = connect(setup(broker), {
- clean: true, clientId: 'abcde'
- }, function () {
- subscribe(t, subscriber, 'hello', 0, function () {
- subscriber.inStream.end()
- subscriber.broker.persistence.subscriptionsByClient(broker.clients.abcde, function (_, subs, client) {
- t.equal(subs, null, 'no previous subscriptions restored')
- })
- const publisher = connect(setup(broker), {
- }, function () {
- subscriber = connect(setup(broker), {
- clean: true, clientId: 'abcde'
- }, function (connect) {
- t.equal(connect.sessionPresent, false, 'session present is set to false')
- publisher.inStream.write({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world',
- qos: 0
- })
- })
- subscriber.outStream.once('data', function (packet) {
- t.fail('packet received')
- })
- })
- })
- })
- })
- test('double sub does not double deliver', function (t) {
- t.plan(7)
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- dup: false,
- length: 12,
- qos: 0,
- retain: false
- }
- const s = connect(setup(), {
- }, function () {
- subscribe(t, s, 'hello', 0, function () {
- subscribe(t, s, 'hello', 0, function () {
- s.outStream.once('data', function (packet) {
- t.same(packet, expected, 'packet matches')
- s.outStream.on('data', function () {
- t.fail('double deliver')
- })
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- })
- })
- })
- })
- t.teardown(s.broker.close.bind(s.broker))
- })
- test('overlapping sub does not double deliver', function (t) {
- t.plan(7)
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- dup: false,
- length: 12,
- qos: 0,
- retain: false
- }
- const s = connect(setup(), {
- }, function () {
- subscribe(t, s, 'hello', 0, function () {
- subscribe(t, s, 'hello/#', 0, function () {
- s.outStream.once('data', function (packet) {
- t.same(packet, expected, 'packet matches')
- s.outStream.on('data', function () {
- t.fail('double deliver')
- })
- })
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- })
- })
- })
- })
- t.teardown(s.broker.close.bind(s.broker))
- })
- test('clear drain', function (t) {
- t.plan(4)
- const s = connect(setup(), {
- }, function () {
- subscribe(t, s, 'hello', 0, function () {
- // fake a busy socket
- s.conn.write = function (chunk, enc, cb) {
- return false
- }
- s.broker.publish({
- cmd: 'publish',
- topic: 'hello',
- payload: 'world'
- }, function () {
- t.pass('callback called')
- })
- s.conn.destroy()
- })
- })
- t.teardown(s.broker.close.bind(s.broker))
- })
- test('id option', function (t) {
- t.plan(2)
- const broker1 = aedes()
- setup(broker1).conn.destroy()
- t.ok(broker1.id, 'broker gets random id when id option not set')
- const broker2 = aedes({ id: 'abc' })
- setup(broker2).conn.destroy()
- t.equal(broker2.id, 'abc', 'broker id equals id option when set')
- t.teardown(() => {
- broker1.close()
- broker2.close()
- })
- })
- test('not duplicate client close when client error occurs', function (t) {
- t.plan(1)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- connect(setup(broker))
- broker.on('client', function (client) {
- client.conn.on('drain', () => {
- t.pass('client closed ok')
- })
- client.close()
- // add back to test if there is duplicated close() call
- client.conn.on('drain', () => {
- t.fail('double client close calls')
- })
- })
- })
- test('not duplicate client close when double close() called', function (t) {
- t.plan(1)
- const broker = aedes()
- t.teardown(broker.close.bind(broker))
- connect(setup(broker))
- broker.on('clientReady', function (client) {
- client.conn.on('drain', () => {
- t.pass('client closed ok')
- })
- client.close()
- // add back to test if there is duplicated close() call
- client.conn.on('drain', () => {
- t.fail('double execute client close function')
- })
- client.close()
- })
- })
|