| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596 |
- const { Readable } = require('stream')
- const Packet = require('aedes-packet')
- function abstractPersistence (opts) {
- const test = opts.test
- let _persistence = opts.persistence
- const waitForReady = opts.waitForReady
- // requiring it here so it will not error for modules
- // not using the default emitter
- const buildEmitter = opts.buildEmitter || require('mqemitter')
- if (_persistence.length === 0) {
- _persistence = function asyncify (cb) {
- cb(null, opts.persistence())
- }
- }
- function persistence (cb) {
- const mq = buildEmitter()
- const broker = {
- id: 'broker-42',
- mq,
- publish: mq.emit.bind(mq),
- subscribe: mq.on.bind(mq),
- unsubscribe: mq.removeListener.bind(mq),
- counter: 0
- }
- _persistence((err, instance) => {
- if (instance) {
- // Wait for ready event, if applicable, to ensure the persistence isn't
- // destroyed while it's still being set up.
- // https://github.com/mcollina/aedes-persistence-redis/issues/41
- if (waitForReady) {
- // We have to listen to 'ready' before setting broker because that
- // can result in 'ready' being emitted.
- instance.on('ready', () => {
- instance.removeListener('error', cb)
- cb(null, instance)
- })
- instance.on('error', cb)
- }
- instance.broker = broker
- if (waitForReady) {
- // 'ready' event will call back.
- return
- }
- }
- cb(err, instance)
- })
- }
- // legacy third party streams are typically not iterable
- function iterableStream (stream) {
- if (typeof stream[Symbol.asyncIterator] !== 'function') {
- return new Readable({ objectMode: true }).wrap(stream)
- }
- return stream
- }
- // end of legacy third party streams support
- async function getArrayFromStream (stream) {
- const list = []
- for await (const item of iterableStream(stream)) {
- list.push(item)
- }
- return list
- }
- async function streamForEach (stream, fn) {
- for await (const item of iterableStream(stream)) {
- await fn(item)
- }
- }
- function storeRetained (instance, opts, cb) {
- opts = opts || {}
- const packet = {
- cmd: 'publish',
- id: instance.broker.id,
- topic: opts.topic || 'hello/world',
- payload: opts.payload || Buffer.from('muahah'),
- qos: 0,
- retain: true
- }
- instance.storeRetained(packet, err => {
- cb(err, packet)
- })
- }
- function matchRetainedWithPattern (t, pattern, opts) {
- persistence((err, instance) => {
- if (err) { throw err }
- storeRetained(instance, opts, (err, packet) => {
- t.notOk(err, 'no error')
- let stream
- if (Array.isArray(pattern)) {
- stream = instance.createRetainedStreamCombi(pattern)
- } else {
- stream = instance.createRetainedStream(pattern)
- }
- getArrayFromStream(stream).then(list => {
- t.deepEqual(list, [packet], 'must return the packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- }
- function testInstance (title, cb) {
- test(title, t => {
- persistence((err, instance) => {
- if (err) { throw err }
- cb(t, instance)
- })
- })
- }
- function testPacket (t, packet, expected) {
- if (packet.messageId === null) packet.messageId = undefined
- t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
- t.deepLooseEqual(packet, expected, 'must return the packet')
- }
- function deClassed (obj) {
- return Object.assign({}, obj)
- }
- test('store and look up retained messages', t => {
- matchRetainedWithPattern(t, 'hello/world')
- })
- test('look up retained messages with a # pattern', t => {
- matchRetainedWithPattern(t, '#')
- })
- test('look up retained messages with a hello/world/# pattern', t => {
- matchRetainedWithPattern(t, 'hello/world/#')
- })
- test('look up retained messages with a + pattern', t => {
- matchRetainedWithPattern(t, 'hello/+')
- })
- test('look up retained messages with multiple patterns', t => {
- matchRetainedWithPattern(t, ['hello/+', 'other/hello'])
- })
- testInstance('store multiple retained messages in order', (t, instance) => {
- const totalMessages = 1000
- let done = 0
- const retained = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- retain: true
- }
- function checkIndex (index) {
- const packet = new Packet(retained, instance.broker)
- instance.storeRetained(packet, err => {
- t.notOk(err, 'no error')
- t.equal(packet.brokerCounter, index + 1, 'packet stored in order')
- if (++done === totalMessages) {
- instance.destroy(t.end.bind(t))
- }
- })
- }
- for (let i = 0; i < totalMessages; i++) {
- checkIndex(i)
- }
- })
- testInstance('remove retained message', (t, instance) => {
- storeRetained(instance, {}, (err, packet) => {
- t.notOk(err, 'no error')
- storeRetained(instance, {
- payload: Buffer.alloc(0)
- }, err => {
- t.notOk(err, 'no error')
- const stream = instance.createRetainedStream('#')
- getArrayFromStream(stream).then(list => {
- t.deepEqual(list, [], 'must return an empty list')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('storing twice a retained message should keep only the last', (t, instance) => {
- storeRetained(instance, {}, (err, packet) => {
- t.notOk(err, 'no error')
- storeRetained(instance, {
- payload: Buffer.from('ahah')
- }, (err, packet) => {
- t.notOk(err, 'no error')
- const stream = instance.createRetainedStream('#')
- getArrayFromStream(stream).then(list => {
- t.deepEqual(list, [packet], 'must return the last packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('Create a new packet while storing a retained message', (t, instance) => {
- const packet = {
- cmd: 'publish',
- id: instance.broker.id,
- topic: opts.topic || 'hello/world',
- payload: opts.payload || Buffer.from('muahah'),
- qos: 0,
- retain: true
- }
- const newPacket = Object.assign({}, packet)
- instance.storeRetained(packet, err => {
- t.notOk(err, 'no error')
- // packet reference change to check if a new packet is stored always
- packet.retain = false
- const stream = instance.createRetainedStream('#')
- getArrayFromStream(stream).then(list => {
- t.deepEqual(list, [newPacket], 'must return the last packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('store and look up subscriptions by client', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'matteo',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'noqos',
- qos: 0,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.notOk(err, 'no error')
- instance.subscriptionsByClient(client, (err, resubs, reReClient) => {
- t.equal(reReClient, client, 'client must be the same')
- t.notOk(err, 'no error')
- t.deepEqual(resubs, subs)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('remove subscriptions by client', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'matteo',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.notOk(err, 'no error')
- instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
- t.notOk(err, 'no error')
- t.equal(reClient, client, 'client must be the same')
- instance.subscriptionsByClient(client, (err, resubs, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.notOk(err, 'no error')
- t.deepEqual(resubs, [{
- topic: 'matteo',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }])
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('store and look up subscriptions by topic', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'hello/#',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'matteo',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, err => {
- t.notOk(err, 'no error')
- instance.subscriptionsByTopic('hello', (err, resubs) => {
- t.notOk(err, 'no error')
- t.deepEqual(resubs, [{
- clientId: client.id,
- topic: 'hello/#',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- clientId: client.id,
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }])
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('get client list after subscriptions', (t, instance) => {
- const client1 = { id: 'abcde' }
- const client2 = { id: 'efghi' }
- const subs = [{
- topic: 'helloagain',
- qos: 1
- }]
- instance.addSubscriptions(client1, subs, err => {
- t.notOk(err, 'no error for client 1')
- instance.addSubscriptions(client2, subs, err => {
- t.notOk(err, 'no error for client 2')
- const stream = instance.getClientList(subs[0].topic)
- getArrayFromStream(stream).then(out => {
- t.deepEqual(out, [client1.id, client2.id])
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('get client list after an unsubscribe', (t, instance) => {
- const client1 = { id: 'abcde' }
- const client2 = { id: 'efghi' }
- const subs = [{
- topic: 'helloagain',
- qos: 1
- }]
- instance.addSubscriptions(client1, subs, err => {
- t.notOk(err, 'no error for client 1')
- instance.addSubscriptions(client2, subs, err => {
- t.notOk(err, 'no error for client 2')
- instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
- t.notOk(err, 'no error for removeSubscriptions')
- const stream = instance.getClientList(subs[0].topic)
- getArrayFromStream(stream).then(out => {
- t.deepEqual(out, [client1.id])
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('get subscriptions list after an unsubscribe', (t, instance) => {
- const client1 = { id: 'abcde' }
- const client2 = { id: 'efghi' }
- const subs = [{
- topic: 'helloagain',
- qos: 1
- }]
- instance.addSubscriptions(client1, subs, err => {
- t.notOk(err, 'no error for client 1')
- instance.addSubscriptions(client2, subs, err => {
- t.notOk(err, 'no error for client 2')
- instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
- t.notOk(err, 'no error for removeSubscriptions')
- instance.subscriptionsByTopic(subs[0].topic, (err, clients) => {
- t.notOk(err, 'no error getting subscriptions by topic')
- t.deepEqual(clients[0].clientId, client1.id)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('QoS 0 subscriptions, restored but not matched', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 0,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'hello/#',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'matteo',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, err => {
- t.notOk(err, 'no error')
- instance.subscriptionsByClient(client, (err, resubs) => {
- t.notOk(err, 'no error')
- t.deepEqual(resubs, subs)
- instance.subscriptionsByTopic('hello', (err, resubs2) => {
- t.notOk(err, 'no error')
- t.deepEqual(resubs2, [{
- clientId: client.id,
- topic: 'hello/#',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }])
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('clean subscriptions', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 1
- }, {
- topic: 'matteo',
- qos: 1
- }]
- instance.addSubscriptions(client, subs, err => {
- t.notOk(err, 'no error')
- instance.cleanSubscriptions(client, err => {
- t.notOk(err, 'no error')
- instance.subscriptionsByTopic('hello', (err, resubs) => {
- t.notOk(err, 'no error')
- t.deepEqual(resubs, [], 'no subscriptions')
- instance.subscriptionsByClient(client, (err, resubs) => {
- t.error(err)
- t.deepEqual(resubs, null, 'no subscriptions')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'no subscriptions added')
- t.equal(clientsCount, 0, 'no clients added')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- })
- testInstance('clean subscriptions with no active subscriptions', (t, instance) => {
- const client = { id: 'abcde' }
- instance.cleanSubscriptions(client, err => {
- t.notOk(err, 'no error')
- instance.subscriptionsByTopic('hello', (err, resubs) => {
- t.notOk(err, 'no error')
- t.deepEqual(resubs, [], 'no subscriptions')
- instance.subscriptionsByClient(client, (err, resubs) => {
- t.error(err)
- t.deepEqual(resubs, null, 'no subscriptions')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'no subscriptions added')
- t.equal(clientsCount, 0, 'no clients added')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('same topic, different QoS', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 0,
- rh: 0,
- rap: true,
- nl: false
- }, {
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.subscriptionsByClient(client, (err, subsForClient, client) => {
- t.error(err, 'no error')
- t.deepEqual(subsForClient, [{
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }])
- instance.subscriptionsByTopic('hello', (err, subsForTopic) => {
- t.error(err, 'no error')
- t.deepEqual(subsForTopic, [{
- clientId: 'abcde',
- topic: 'hello',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }])
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 1, 'one subscription added')
- t.equal(clientsCount, 1, 'one client added')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('replace subscriptions', (t, instance) => {
- const client = { id: 'abcde' }
- const topic = 'hello'
- const sub = { topic, rh: 0, rap: true, nl: false }
- const subByTopic = { clientId: client.id, topic, rh: 0, rap: true, nl: false }
- function check (qos, cb) {
- sub.qos = subByTopic.qos = qos
- instance.addSubscriptions(client, [sub], (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.subscriptionsByClient(client, (err, subsForClient, client) => {
- t.error(err, 'no error')
- t.deepEqual(subsForClient, [sub])
- instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
- t.error(err, 'no error')
- t.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic])
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- if (qos === 0) {
- t.equal(subsCount, 0, 'no subscriptions added')
- } else {
- t.equal(subsCount, 1, 'one subscription added')
- }
- t.equal(clientsCount, 1, 'one client added')
- cb()
- })
- })
- })
- })
- }
- check(0, () => {
- check(1, () => {
- check(2, () => {
- check(1, () => {
- check(0, () => {
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- })
- testInstance('replace subscriptions in same call', (t, instance) => {
- const client = { id: 'abcde' }
- const topic = 'hello'
- const subs = [
- { topic, qos: 0, rh: 0, rap: true, nl: false },
- { topic, qos: 1, rh: 0, rap: true, nl: false },
- { topic, qos: 2, rh: 0, rap: true, nl: false },
- { topic, qos: 1, rh: 0, rap: true, nl: false },
- { topic, qos: 0, rh: 0, rap: true, nl: false }
- ]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.subscriptionsByClient(client, (err, subsForClient, client) => {
- t.error(err, 'no error')
- t.deepEqual(subsForClient, [{ topic, qos: 0, rh: 0, rap: true, nl: false }])
- instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
- t.error(err, 'no error')
- t.deepEqual(subsForTopic, [])
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'no subscriptions added')
- t.equal(clientsCount, 1, 'one client added')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('store and count subscriptions', (t, instance) => {
- const client = { id: 'abcde' }
- const subs = [{
- topic: 'hello',
- qos: 1
- }, {
- topic: 'matteo',
- qos: 1
- }, {
- topic: 'noqos',
- qos: 0
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 2, 'two subscriptions added')
- t.equal(clientsCount, 1, 'one client added')
- instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
- t.error(err, 'no error')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 1, 'one subscription added')
- t.equal(clientsCount, 1, 'one client added')
- instance.removeSubscriptions(client, ['matteo'], (err, reClient) => {
- t.error(err, 'no error')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'zero subscriptions added')
- t.equal(clientsCount, 1, 'one client added')
- instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
- t.error(err, 'no error')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'zero subscriptions added')
- t.equal(clientsCount, 0, 'zero clients added')
- instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
- t.error(err, 'no error')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, 0, 'zero subscriptions added')
- t.equal(clientsCount, 0, 'zero clients added')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- })
- })
- })
- })
- })
- })
- testInstance('count subscriptions with two clients', (t, instance) => {
- const client1 = { id: 'abcde' }
- const client2 = { id: 'fghij' }
- const subs = [{
- topic: 'hello',
- qos: 1
- }, {
- topic: 'matteo',
- qos: 1
- }, {
- topic: 'noqos',
- qos: 0
- }]
- function remove (client, subs, expectedSubs, expectedClients, cb) {
- instance.removeSubscriptions(client, subs, (err, reClient) => {
- t.error(err, 'no error')
- t.equal(reClient, client, 'client must be the same')
- instance.countOffline((err, subsCount, clientsCount) => {
- t.error(err, 'no error')
- t.equal(subsCount, expectedSubs, 'subscriptions added')
- t.equal(clientsCount, expectedClients, 'clients added')
- cb()
- })
- })
- }
- instance.addSubscriptions(client1, subs, (err, reClient) => {
- t.equal(reClient, client1, 'client must be the same')
- t.error(err, 'no error')
- instance.addSubscriptions(client2, subs, (err, reClient) => {
- t.equal(reClient, client2, 'client must be the same')
- t.error(err, 'no error')
- remove(client1, ['foobar'], 4, 2, () => {
- remove(client1, ['hello'], 3, 2, () => {
- remove(client1, ['hello'], 3, 2, () => {
- remove(client1, ['matteo'], 2, 2, () => {
- remove(client1, ['noqos'], 2, 1, () => {
- remove(client2, ['hello'], 1, 1, () => {
- remove(client2, ['matteo'], 0, 1, () => {
- remove(client2, ['noqos'], 0, 0, () => {
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- })
- })
- })
- })
- })
- })
- testInstance('add duplicate subs to persistence for qos > 0', (t, instance) => {
- const client = { id: 'abcde' }
- const topic = 'hello'
- const subs = [{
- topic,
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.addSubscriptions(client, subs, (err, resCLient) => {
- t.equal(resCLient, client, 'client must be the same')
- t.error(err, 'no error')
- subs[0].clientId = client.id
- instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
- t.error(err, 'no error')
- t.deepEqual(subsForTopic, subs)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('add duplicate subs to persistence for qos 0', (t, instance) => {
- const client = { id: 'abcde' }
- const topic = 'hello'
- const subs = [{
- topic,
- qos: 0,
- rh: 0,
- rap: true,
- nl: false
- }]
- instance.addSubscriptions(client, subs, (err, reClient) => {
- t.equal(reClient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.addSubscriptions(client, subs, (err, resCLient) => {
- t.equal(resCLient, client, 'client must be the same')
- t.error(err, 'no error')
- instance.subscriptionsByClient(client, (err, subsForClient, client) => {
- t.error(err, 'no error')
- t.deepEqual(subsForClient, subs)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('get topic list after concurrent subscriptions of a client', (t, instance) => {
- const client = { id: 'abcde' }
- const subs1 = [{
- topic: 'hello1',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- const subs2 = [{
- topic: 'hello2',
- qos: 1,
- rh: 0,
- rap: true,
- nl: false
- }]
- let calls = 2
- function done () {
- if (!--calls) {
- instance.subscriptionsByClient(client, (err, resubs) => {
- t.notOk(err, 'no error')
- resubs.sort((a, b) => b.topic.localeCompare(b.topic, 'en'))
- t.deepEqual(resubs, [subs1[0], subs2[0]])
- instance.destroy(t.end.bind(t))
- })
- }
- }
- instance.addSubscriptions(client, subs1, err => {
- t.notOk(err, 'no error for hello1')
- done()
- })
- instance.addSubscriptions(client, subs2, err => {
- t.notOk(err, 'no error for hello2')
- done()
- })
- })
- testInstance('add outgoing packet and stream it', (t, instance) => {
- const sub = {
- clientId: 'abcde',
- topic: 'hello',
- qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- retain: false,
- dup: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: undefined
- }
- instance.outgoingEnqueue(sub, packet, err => {
- t.error(err)
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- const packet = list[0]
- testPacket(t, packet, expected)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('add outgoing packet for multiple subs and stream to all', (t, instance) => {
- const sub = {
- clientId: 'abcde',
- topic: 'hello',
- qos: 1
- }
- const sub2 = {
- clientId: 'fghih',
- topic: 'hello',
- qos: 1
- }
- const subs = [sub, sub2]
- const client = {
- id: sub.clientId
- }
- const client2 = {
- id: sub2.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- retain: false,
- dup: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: undefined
- }
- instance.outgoingEnqueueCombi(subs, packet, err => {
- t.error(err)
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- const packet = list[0]
- testPacket(t, packet, expected)
- const stream2 = instance.outgoingStream(client2)
- getArrayFromStream(stream2).then(list2 => {
- const packet = list2[0]
- testPacket(t, packet, expected)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('add outgoing packet as a string and pump', (t, instance) => {
- const sub = {
- clientId: 'abcde',
- topic: 'hello',
- qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet1 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 10
- }
- const packet2 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('matteo'),
- qos: 1,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 50
- }
- const queue = []
- enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
- enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
- const stream = instance.outgoingStream(client)
- async function clearQueue (data) {
- return new Promise((resolve, reject) => {
- instance.outgoingUpdate(client, data,
- (err, client, packet) => {
- t.notOk(err, 'no error')
- queue.push(packet)
- resolve()
- })
- })
- }
- streamForEach(stream, clearQueue).then(function done () {
- t.equal(queue.length, 2)
- if (queue.length === 2) {
- t.deepEqual(deClassed(queue[0]), deClassed(updated1))
- t.deepEqual(deClassed(queue[1]), deClassed(updated2))
- }
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('add outgoing packet as a string and stream', (t, instance) => {
- const sub = {
- clientId: 'abcde',
- topic: 'hello',
- qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: 'world',
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: 'world',
- qos: 1,
- retain: false,
- dup: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: undefined
- }
- instance.outgoingEnqueueCombi([sub], packet, err => {
- t.error(err)
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- const packet = list[0]
- testPacket(t, packet, expected)
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('add outgoing packet and stream it twice', (t, instance) => {
- const sub = {
- clientId: 'abcde',
- topic: 'hello',
- qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: 4242
- }
- const expected = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- retain: false,
- dup: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: undefined
- }
- instance.outgoingEnqueueCombi([sub], packet, err => {
- t.error(err)
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- const packet = list[0]
- testPacket(t, packet, expected)
- const stream2 = instance.outgoingStream(client)
- getArrayFromStream(stream2).then(list2 => {
- const packet = list2[0]
- testPacket(t, packet, expected)
- t.notEqual(packet, expected, 'packet must be a different object')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- function enqueueAndUpdate (t, instance, client, sub, packet, messageId, callback) {
- instance.outgoingEnqueueCombi([sub], packet, err => {
- t.error(err)
- const updated = new Packet(packet)
- updated.messageId = messageId
- instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
- t.error(err)
- t.equal(reclient, client, 'client matches')
- t.equal(repacket, updated, 'packet matches')
- callback(updated)
- })
- })
- }
- testInstance('add outgoing packet and update messageId', (t, instance) => {
- const sub = {
- clientId: 'abcde', topic: 'hello', qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- enqueueAndUpdate(t, instance, client, sub, packet, 42, updated => {
- const stream = instance.outgoingStream(client)
- delete updated.messageId
- getArrayFromStream(stream).then(list => {
- delete list[0].messageId
- t.notEqual(list[0], updated, 'must not be the same object')
- t.deepEqual(deClassed(list[0]), deClassed(updated), 'must return the packet')
- t.equal(list.length, 1, 'must return only one packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('add 2 outgoing packet and clear messageId', (t, instance) => {
- const sub = {
- clientId: 'abcde', topic: 'hello', qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet1 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- const packet2 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('matteo'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 43
- }
- enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
- enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
- instance.outgoingClearMessageId(client, updated1, (err, packet) => {
- t.error(err)
- t.deepEqual(packet.messageId, 42, 'must have the same messageId')
- t.deepEqual(packet.payload.toString(), packet1.payload.toString(), 'must have original payload')
- t.deepEqual(packet.topic, packet1.topic, 'must have original topic')
- const stream = instance.outgoingStream(client)
- delete updated2.messageId
- getArrayFromStream(stream).then(list => {
- delete list[0].messageId
- t.notEqual(list[0], updated2, 'must not be the same object')
- t.deepEqual(deClassed(list[0]), deClassed(updated2), 'must return the packet')
- t.equal(list.length, 1, 'must return only one packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('add many outgoing packets and clear messageIds', async (t, instance) => {
- const sub = {
- clientId: 'abcde', topic: 'hello', qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 1,
- dup: false,
- length: 14,
- retain: false
- }
- function outStream (instance, client) {
- return iterableStream(instance.outgoingStream(client))
- }
- // we just need a stream to figure out the high watermark
- const stream = outStream(instance, client)
- const total = stream.readableHighWaterMark * 2
- function submitMessage (id) {
- return new Promise((resolve, reject) => {
- const p = new Packet(packet, instance.broker)
- p.messageId = id
- instance.outgoingEnqueue(sub, p, (err) => {
- if (err) {
- return reject(err)
- }
- instance.outgoingUpdate(client, p, resolve)
- })
- })
- }
- function clearMessage (p) {
- return new Promise((resolve, reject) => {
- instance.outgoingClearMessageId(client, p, (err, received) => {
- t.error(err)
- t.deepEqual(received, p, 'must return the packet')
- resolve()
- })
- })
- }
- for (let i = 0; i < total; i++) {
- await submitMessage(i)
- }
- let queued = 0
- for await (const p of outStream(instance, client)) {
- if (p) {
- queued++
- }
- }
- t.equal(queued, total, `outgoing queue must hold ${total} items`)
- for await (const p of outStream(instance, client)) {
- await clearMessage(p)
- }
- let queued2 = 0
- for await (const p of outStream(instance, client)) {
- if (p) {
- queued2++
- }
- }
- t.equal(queued2, 0, 'outgoing queue is empty')
- instance.destroy(t.end.bind(t))
- })
- testInstance('update to publish w/ same messageId', (t, instance) => {
- const sub = {
- clientId: 'abcde', topic: 'hello', qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet1 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 2,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42,
- messageId: 42
- }
- const packet2 = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 2,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 50,
- messageId: 42
- }
- instance.outgoingEnqueue(sub, packet1, () => {
- instance.outgoingEnqueue(sub, packet2, () => {
- instance.outgoingUpdate(client, packet1, () => {
- instance.outgoingUpdate(client, packet2, () => {
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- t.equal(list.length, 2, 'must have two items in queue')
- t.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match')
- t.equal(list[0].messageId, packet1.messageId, 'messageId must match')
- t.equal(list[1].brokerCounter, packet2.brokerCounter, 'brokerCounter must match')
- t.equal(list[1].messageId, packet2.messageId, 'messageId must match')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- })
- testInstance('update to pubrel', (t, instance) => {
- const sub = {
- clientId: 'abcde', topic: 'hello', qos: 1
- }
- const client = {
- id: sub.clientId
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 2,
- dup: false,
- length: 14,
- retain: false,
- brokerId: instance.broker.id,
- brokerCounter: 42
- }
- instance.outgoingEnqueueCombi([sub], packet, err => {
- t.error(err)
- const updated = new Packet(packet)
- updated.messageId = 42
- instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
- t.error(err)
- t.equal(reclient, client, 'client matches')
- t.equal(repacket, updated, 'packet matches')
- const pubrel = {
- cmd: 'pubrel',
- messageId: updated.messageId
- }
- instance.outgoingUpdate(client, pubrel, err => {
- t.error(err)
- const stream = instance.outgoingStream(client)
- getArrayFromStream(stream).then(list => {
- t.deepEqual(list, [pubrel], 'must return the packet')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('add incoming packet, get it, and clear with messageId', (t, instance) => {
- const client = {
- id: 'abcde'
- }
- const packet = {
- cmd: 'publish',
- topic: 'hello',
- payload: Buffer.from('world'),
- qos: 2,
- dup: false,
- length: 14,
- retain: false,
- messageId: 42
- }
- instance.incomingStorePacket(client, packet, err => {
- t.error(err)
- instance.incomingGetPacket(client, {
- messageId: packet.messageId
- }, (err, retrieved) => {
- t.error(err)
- // adjusting the objects so they match
- delete retrieved.brokerCounter
- delete retrieved.brokerId
- delete packet.length
- t.deepLooseEqual(retrieved, packet, 'retrieved packet must be deeply equal')
- t.notEqual(retrieved, packet, 'retrieved packet must not be the same objet')
- instance.incomingDelPacket(client, retrieved, err => {
- t.error(err)
- instance.incomingGetPacket(client, {
- messageId: packet.messageId
- }, (err, retrieved) => {
- t.ok(err, 'must error')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('store, fetch and delete will message', (t, instance) => {
- const client = {
- id: '12345'
- }
- const expected = {
- topic: 'hello/died',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }
- instance.putWill(client, expected, (err, c) => {
- t.error(err, 'no error')
- t.equal(c, client, 'client matches')
- instance.getWill(client, (err, packet, c) => {
- t.error(err, 'no error')
- t.deepEqual(packet, expected, 'will matches')
- t.equal(c, client, 'client matches')
- client.brokerId = packet.brokerId
- instance.delWill(client, (err, packet, c) => {
- t.error(err, 'no error')
- t.deepEqual(packet, expected, 'will matches')
- t.equal(c, client, 'client matches')
- instance.getWill(client, (err, packet, c) => {
- t.error(err, 'no error')
- t.notOk(packet, 'no will after del')
- t.equal(c, client, 'client matches')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('stream all will messages', (t, instance) => {
- const client = {
- id: '12345'
- }
- const toWrite = {
- topic: 'hello/died',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }
- instance.putWill(client, toWrite, (err, c) => {
- t.error(err, 'no error')
- t.equal(c, client, 'client matches')
- streamForEach(instance.streamWill(), (chunk) => {
- t.deepEqual(chunk, {
- clientId: client.id,
- brokerId: instance.broker.id,
- topic: 'hello/died',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }, 'packet matches')
- instance.delWill(client, (err, result, client) => {
- t.error(err, 'no error')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- testInstance('stream all will message for unknown brokers', (t, instance) => {
- const originalId = instance.broker.id
- const client = {
- id: '42'
- }
- const anotherClient = {
- id: '24'
- }
- const toWrite1 = {
- topic: 'hello/died42',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }
- const toWrite2 = {
- topic: 'hello/died24',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }
- instance.putWill(client, toWrite1, (err, c) => {
- t.error(err, 'no error')
- t.equal(c, client, 'client matches')
- instance.broker.id = 'anotherBroker'
- instance.putWill(anotherClient, toWrite2, (err, c) => {
- t.error(err, 'no error')
- t.equal(c, anotherClient, 'client matches')
- streamForEach(instance.streamWill({
- anotherBroker: Date.now()
- }), (chunk) => {
- t.deepEqual(chunk, {
- clientId: client.id,
- brokerId: originalId,
- topic: 'hello/died42',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }, 'packet matches')
- instance.delWill(client, (err, result, client) => {
- t.error(err, 'no error')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- })
- })
- testInstance('delete wills from dead brokers', (t, instance) => {
- const client = {
- id: '42'
- }
- const toWrite1 = {
- topic: 'hello/died42',
- payload: Buffer.from('muahahha'),
- qos: 0,
- retain: true
- }
- instance.putWill(client, toWrite1, (err, c) => {
- t.error(err, 'no error')
- t.equal(c, client, 'client matches')
- instance.broker.id = 'anotherBroker'
- client.brokerId = instance.broker.id
- instance.delWill(client, (err, result, client) => {
- t.error(err, 'no error')
- instance.destroy(t.end.bind(t))
- })
- })
- })
- testInstance('do not error if unkown messageId in outoingClearMessageId', (t, instance) => {
- const client = {
- id: 'abc-123'
- }
- instance.outgoingClearMessageId(client, 42, err => {
- t.error(err)
- instance.destroy(t.end.bind(t))
- })
- })
- }
- module.exports = abstractPersistence
|