abstract.js 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596
  1. const { Readable } = require('stream')
  2. const Packet = require('aedes-packet')
  3. function abstractPersistence (opts) {
  4. const test = opts.test
  5. let _persistence = opts.persistence
  6. const waitForReady = opts.waitForReady
  7. // requiring it here so it will not error for modules
  8. // not using the default emitter
  9. const buildEmitter = opts.buildEmitter || require('mqemitter')
  10. if (_persistence.length === 0) {
  11. _persistence = function asyncify (cb) {
  12. cb(null, opts.persistence())
  13. }
  14. }
  15. function persistence (cb) {
  16. const mq = buildEmitter()
  17. const broker = {
  18. id: 'broker-42',
  19. mq,
  20. publish: mq.emit.bind(mq),
  21. subscribe: mq.on.bind(mq),
  22. unsubscribe: mq.removeListener.bind(mq),
  23. counter: 0
  24. }
  25. _persistence((err, instance) => {
  26. if (instance) {
  27. // Wait for ready event, if applicable, to ensure the persistence isn't
  28. // destroyed while it's still being set up.
  29. // https://github.com/mcollina/aedes-persistence-redis/issues/41
  30. if (waitForReady) {
  31. // We have to listen to 'ready' before setting broker because that
  32. // can result in 'ready' being emitted.
  33. instance.on('ready', () => {
  34. instance.removeListener('error', cb)
  35. cb(null, instance)
  36. })
  37. instance.on('error', cb)
  38. }
  39. instance.broker = broker
  40. if (waitForReady) {
  41. // 'ready' event will call back.
  42. return
  43. }
  44. }
  45. cb(err, instance)
  46. })
  47. }
  48. // legacy third party streams are typically not iterable
  49. function iterableStream (stream) {
  50. if (typeof stream[Symbol.asyncIterator] !== 'function') {
  51. return new Readable({ objectMode: true }).wrap(stream)
  52. }
  53. return stream
  54. }
  55. // end of legacy third party streams support
  56. async function getArrayFromStream (stream) {
  57. const list = []
  58. for await (const item of iterableStream(stream)) {
  59. list.push(item)
  60. }
  61. return list
  62. }
  63. async function streamForEach (stream, fn) {
  64. for await (const item of iterableStream(stream)) {
  65. await fn(item)
  66. }
  67. }
  68. function storeRetained (instance, opts, cb) {
  69. opts = opts || {}
  70. const packet = {
  71. cmd: 'publish',
  72. id: instance.broker.id,
  73. topic: opts.topic || 'hello/world',
  74. payload: opts.payload || Buffer.from('muahah'),
  75. qos: 0,
  76. retain: true
  77. }
  78. instance.storeRetained(packet, err => {
  79. cb(err, packet)
  80. })
  81. }
  82. function matchRetainedWithPattern (t, pattern, opts) {
  83. persistence((err, instance) => {
  84. if (err) { throw err }
  85. storeRetained(instance, opts, (err, packet) => {
  86. t.notOk(err, 'no error')
  87. let stream
  88. if (Array.isArray(pattern)) {
  89. stream = instance.createRetainedStreamCombi(pattern)
  90. } else {
  91. stream = instance.createRetainedStream(pattern)
  92. }
  93. getArrayFromStream(stream).then(list => {
  94. t.deepEqual(list, [packet], 'must return the packet')
  95. instance.destroy(t.end.bind(t))
  96. })
  97. })
  98. })
  99. }
  100. function testInstance (title, cb) {
  101. test(title, t => {
  102. persistence((err, instance) => {
  103. if (err) { throw err }
  104. cb(t, instance)
  105. })
  106. })
  107. }
  108. function testPacket (t, packet, expected) {
  109. if (packet.messageId === null) packet.messageId = undefined
  110. t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
  111. t.deepLooseEqual(packet, expected, 'must return the packet')
  112. }
  113. function deClassed (obj) {
  114. return Object.assign({}, obj)
  115. }
  116. test('store and look up retained messages', t => {
  117. matchRetainedWithPattern(t, 'hello/world')
  118. })
  119. test('look up retained messages with a # pattern', t => {
  120. matchRetainedWithPattern(t, '#')
  121. })
  122. test('look up retained messages with a hello/world/# pattern', t => {
  123. matchRetainedWithPattern(t, 'hello/world/#')
  124. })
  125. test('look up retained messages with a + pattern', t => {
  126. matchRetainedWithPattern(t, 'hello/+')
  127. })
  128. test('look up retained messages with multiple patterns', t => {
  129. matchRetainedWithPattern(t, ['hello/+', 'other/hello'])
  130. })
  131. testInstance('store multiple retained messages in order', (t, instance) => {
  132. const totalMessages = 1000
  133. let done = 0
  134. const retained = {
  135. cmd: 'publish',
  136. topic: 'hello',
  137. payload: Buffer.from('world'),
  138. qos: 1,
  139. retain: true
  140. }
  141. function checkIndex (index) {
  142. const packet = new Packet(retained, instance.broker)
  143. instance.storeRetained(packet, err => {
  144. t.notOk(err, 'no error')
  145. t.equal(packet.brokerCounter, index + 1, 'packet stored in order')
  146. if (++done === totalMessages) {
  147. instance.destroy(t.end.bind(t))
  148. }
  149. })
  150. }
  151. for (let i = 0; i < totalMessages; i++) {
  152. checkIndex(i)
  153. }
  154. })
  155. testInstance('remove retained message', (t, instance) => {
  156. storeRetained(instance, {}, (err, packet) => {
  157. t.notOk(err, 'no error')
  158. storeRetained(instance, {
  159. payload: Buffer.alloc(0)
  160. }, err => {
  161. t.notOk(err, 'no error')
  162. const stream = instance.createRetainedStream('#')
  163. getArrayFromStream(stream).then(list => {
  164. t.deepEqual(list, [], 'must return an empty list')
  165. instance.destroy(t.end.bind(t))
  166. })
  167. })
  168. })
  169. })
  170. testInstance('storing twice a retained message should keep only the last', (t, instance) => {
  171. storeRetained(instance, {}, (err, packet) => {
  172. t.notOk(err, 'no error')
  173. storeRetained(instance, {
  174. payload: Buffer.from('ahah')
  175. }, (err, packet) => {
  176. t.notOk(err, 'no error')
  177. const stream = instance.createRetainedStream('#')
  178. getArrayFromStream(stream).then(list => {
  179. t.deepEqual(list, [packet], 'must return the last packet')
  180. instance.destroy(t.end.bind(t))
  181. })
  182. })
  183. })
  184. })
  185. testInstance('Create a new packet while storing a retained message', (t, instance) => {
  186. const packet = {
  187. cmd: 'publish',
  188. id: instance.broker.id,
  189. topic: opts.topic || 'hello/world',
  190. payload: opts.payload || Buffer.from('muahah'),
  191. qos: 0,
  192. retain: true
  193. }
  194. const newPacket = Object.assign({}, packet)
  195. instance.storeRetained(packet, err => {
  196. t.notOk(err, 'no error')
  197. // packet reference change to check if a new packet is stored always
  198. packet.retain = false
  199. const stream = instance.createRetainedStream('#')
  200. getArrayFromStream(stream).then(list => {
  201. t.deepEqual(list, [newPacket], 'must return the last packet')
  202. instance.destroy(t.end.bind(t))
  203. })
  204. })
  205. })
  206. testInstance('store and look up subscriptions by client', (t, instance) => {
  207. const client = { id: 'abcde' }
  208. const subs = [{
  209. topic: 'hello',
  210. qos: 1,
  211. rh: 0,
  212. rap: true,
  213. nl: false
  214. }, {
  215. topic: 'matteo',
  216. qos: 1,
  217. rh: 0,
  218. rap: true,
  219. nl: false
  220. }, {
  221. topic: 'noqos',
  222. qos: 0,
  223. rh: 0,
  224. rap: true,
  225. nl: false
  226. }]
  227. instance.addSubscriptions(client, subs, (err, reClient) => {
  228. t.equal(reClient, client, 'client must be the same')
  229. t.notOk(err, 'no error')
  230. instance.subscriptionsByClient(client, (err, resubs, reReClient) => {
  231. t.equal(reReClient, client, 'client must be the same')
  232. t.notOk(err, 'no error')
  233. t.deepEqual(resubs, subs)
  234. instance.destroy(t.end.bind(t))
  235. })
  236. })
  237. })
  238. testInstance('remove subscriptions by client', (t, instance) => {
  239. const client = { id: 'abcde' }
  240. const subs = [{
  241. topic: 'hello',
  242. qos: 1,
  243. rh: 0,
  244. rap: true,
  245. nl: false
  246. }, {
  247. topic: 'matteo',
  248. qos: 1,
  249. rh: 0,
  250. rap: true,
  251. nl: false
  252. }]
  253. instance.addSubscriptions(client, subs, (err, reClient) => {
  254. t.notOk(err, 'no error')
  255. instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
  256. t.notOk(err, 'no error')
  257. t.equal(reClient, client, 'client must be the same')
  258. instance.subscriptionsByClient(client, (err, resubs, reClient) => {
  259. t.equal(reClient, client, 'client must be the same')
  260. t.notOk(err, 'no error')
  261. t.deepEqual(resubs, [{
  262. topic: 'matteo',
  263. qos: 1,
  264. rh: 0,
  265. rap: true,
  266. nl: false
  267. }])
  268. instance.destroy(t.end.bind(t))
  269. })
  270. })
  271. })
  272. })
  273. testInstance('store and look up subscriptions by topic', (t, instance) => {
  274. const client = { id: 'abcde' }
  275. const subs = [{
  276. topic: 'hello',
  277. qos: 1,
  278. rh: 0,
  279. rap: true,
  280. nl: false
  281. }, {
  282. topic: 'hello/#',
  283. qos: 1,
  284. rh: 0,
  285. rap: true,
  286. nl: false
  287. }, {
  288. topic: 'matteo',
  289. qos: 1,
  290. rh: 0,
  291. rap: true,
  292. nl: false
  293. }]
  294. instance.addSubscriptions(client, subs, err => {
  295. t.notOk(err, 'no error')
  296. instance.subscriptionsByTopic('hello', (err, resubs) => {
  297. t.notOk(err, 'no error')
  298. t.deepEqual(resubs, [{
  299. clientId: client.id,
  300. topic: 'hello/#',
  301. qos: 1,
  302. rh: 0,
  303. rap: true,
  304. nl: false
  305. }, {
  306. clientId: client.id,
  307. topic: 'hello',
  308. qos: 1,
  309. rh: 0,
  310. rap: true,
  311. nl: false
  312. }])
  313. instance.destroy(t.end.bind(t))
  314. })
  315. })
  316. })
  317. testInstance('get client list after subscriptions', (t, instance) => {
  318. const client1 = { id: 'abcde' }
  319. const client2 = { id: 'efghi' }
  320. const subs = [{
  321. topic: 'helloagain',
  322. qos: 1
  323. }]
  324. instance.addSubscriptions(client1, subs, err => {
  325. t.notOk(err, 'no error for client 1')
  326. instance.addSubscriptions(client2, subs, err => {
  327. t.notOk(err, 'no error for client 2')
  328. const stream = instance.getClientList(subs[0].topic)
  329. getArrayFromStream(stream).then(out => {
  330. t.deepEqual(out, [client1.id, client2.id])
  331. instance.destroy(t.end.bind(t))
  332. })
  333. })
  334. })
  335. })
  336. testInstance('get client list after an unsubscribe', (t, instance) => {
  337. const client1 = { id: 'abcde' }
  338. const client2 = { id: 'efghi' }
  339. const subs = [{
  340. topic: 'helloagain',
  341. qos: 1
  342. }]
  343. instance.addSubscriptions(client1, subs, err => {
  344. t.notOk(err, 'no error for client 1')
  345. instance.addSubscriptions(client2, subs, err => {
  346. t.notOk(err, 'no error for client 2')
  347. instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
  348. t.notOk(err, 'no error for removeSubscriptions')
  349. const stream = instance.getClientList(subs[0].topic)
  350. getArrayFromStream(stream).then(out => {
  351. t.deepEqual(out, [client1.id])
  352. instance.destroy(t.end.bind(t))
  353. })
  354. })
  355. })
  356. })
  357. })
  358. testInstance('get subscriptions list after an unsubscribe', (t, instance) => {
  359. const client1 = { id: 'abcde' }
  360. const client2 = { id: 'efghi' }
  361. const subs = [{
  362. topic: 'helloagain',
  363. qos: 1
  364. }]
  365. instance.addSubscriptions(client1, subs, err => {
  366. t.notOk(err, 'no error for client 1')
  367. instance.addSubscriptions(client2, subs, err => {
  368. t.notOk(err, 'no error for client 2')
  369. instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
  370. t.notOk(err, 'no error for removeSubscriptions')
  371. instance.subscriptionsByTopic(subs[0].topic, (err, clients) => {
  372. t.notOk(err, 'no error getting subscriptions by topic')
  373. t.deepEqual(clients[0].clientId, client1.id)
  374. instance.destroy(t.end.bind(t))
  375. })
  376. })
  377. })
  378. })
  379. })
  380. testInstance('QoS 0 subscriptions, restored but not matched', (t, instance) => {
  381. const client = { id: 'abcde' }
  382. const subs = [{
  383. topic: 'hello',
  384. qos: 0,
  385. rh: 0,
  386. rap: true,
  387. nl: false
  388. }, {
  389. topic: 'hello/#',
  390. qos: 1,
  391. rh: 0,
  392. rap: true,
  393. nl: false
  394. }, {
  395. topic: 'matteo',
  396. qos: 1,
  397. rh: 0,
  398. rap: true,
  399. nl: false
  400. }]
  401. instance.addSubscriptions(client, subs, err => {
  402. t.notOk(err, 'no error')
  403. instance.subscriptionsByClient(client, (err, resubs) => {
  404. t.notOk(err, 'no error')
  405. t.deepEqual(resubs, subs)
  406. instance.subscriptionsByTopic('hello', (err, resubs2) => {
  407. t.notOk(err, 'no error')
  408. t.deepEqual(resubs2, [{
  409. clientId: client.id,
  410. topic: 'hello/#',
  411. qos: 1,
  412. rh: 0,
  413. rap: true,
  414. nl: false
  415. }])
  416. instance.destroy(t.end.bind(t))
  417. })
  418. })
  419. })
  420. })
  421. testInstance('clean subscriptions', (t, instance) => {
  422. const client = { id: 'abcde' }
  423. const subs = [{
  424. topic: 'hello',
  425. qos: 1
  426. }, {
  427. topic: 'matteo',
  428. qos: 1
  429. }]
  430. instance.addSubscriptions(client, subs, err => {
  431. t.notOk(err, 'no error')
  432. instance.cleanSubscriptions(client, err => {
  433. t.notOk(err, 'no error')
  434. instance.subscriptionsByTopic('hello', (err, resubs) => {
  435. t.notOk(err, 'no error')
  436. t.deepEqual(resubs, [], 'no subscriptions')
  437. instance.subscriptionsByClient(client, (err, resubs) => {
  438. t.error(err)
  439. t.deepEqual(resubs, null, 'no subscriptions')
  440. instance.countOffline((err, subsCount, clientsCount) => {
  441. t.error(err, 'no error')
  442. t.equal(subsCount, 0, 'no subscriptions added')
  443. t.equal(clientsCount, 0, 'no clients added')
  444. instance.destroy(t.end.bind(t))
  445. })
  446. })
  447. })
  448. })
  449. })
  450. })
  451. testInstance('clean subscriptions with no active subscriptions', (t, instance) => {
  452. const client = { id: 'abcde' }
  453. instance.cleanSubscriptions(client, err => {
  454. t.notOk(err, 'no error')
  455. instance.subscriptionsByTopic('hello', (err, resubs) => {
  456. t.notOk(err, 'no error')
  457. t.deepEqual(resubs, [], 'no subscriptions')
  458. instance.subscriptionsByClient(client, (err, resubs) => {
  459. t.error(err)
  460. t.deepEqual(resubs, null, 'no subscriptions')
  461. instance.countOffline((err, subsCount, clientsCount) => {
  462. t.error(err, 'no error')
  463. t.equal(subsCount, 0, 'no subscriptions added')
  464. t.equal(clientsCount, 0, 'no clients added')
  465. instance.destroy(t.end.bind(t))
  466. })
  467. })
  468. })
  469. })
  470. })
  471. testInstance('same topic, different QoS', (t, instance) => {
  472. const client = { id: 'abcde' }
  473. const subs = [{
  474. topic: 'hello',
  475. qos: 0,
  476. rh: 0,
  477. rap: true,
  478. nl: false
  479. }, {
  480. topic: 'hello',
  481. qos: 1,
  482. rh: 0,
  483. rap: true,
  484. nl: false
  485. }]
  486. instance.addSubscriptions(client, subs, (err, reClient) => {
  487. t.equal(reClient, client, 'client must be the same')
  488. t.error(err, 'no error')
  489. instance.subscriptionsByClient(client, (err, subsForClient, client) => {
  490. t.error(err, 'no error')
  491. t.deepEqual(subsForClient, [{
  492. topic: 'hello',
  493. qos: 1,
  494. rh: 0,
  495. rap: true,
  496. nl: false
  497. }])
  498. instance.subscriptionsByTopic('hello', (err, subsForTopic) => {
  499. t.error(err, 'no error')
  500. t.deepEqual(subsForTopic, [{
  501. clientId: 'abcde',
  502. topic: 'hello',
  503. qos: 1,
  504. rh: 0,
  505. rap: true,
  506. nl: false
  507. }])
  508. instance.countOffline((err, subsCount, clientsCount) => {
  509. t.error(err, 'no error')
  510. t.equal(subsCount, 1, 'one subscription added')
  511. t.equal(clientsCount, 1, 'one client added')
  512. instance.destroy(t.end.bind(t))
  513. })
  514. })
  515. })
  516. })
  517. })
  518. testInstance('replace subscriptions', (t, instance) => {
  519. const client = { id: 'abcde' }
  520. const topic = 'hello'
  521. const sub = { topic, rh: 0, rap: true, nl: false }
  522. const subByTopic = { clientId: client.id, topic, rh: 0, rap: true, nl: false }
  523. function check (qos, cb) {
  524. sub.qos = subByTopic.qos = qos
  525. instance.addSubscriptions(client, [sub], (err, reClient) => {
  526. t.equal(reClient, client, 'client must be the same')
  527. t.error(err, 'no error')
  528. instance.subscriptionsByClient(client, (err, subsForClient, client) => {
  529. t.error(err, 'no error')
  530. t.deepEqual(subsForClient, [sub])
  531. instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
  532. t.error(err, 'no error')
  533. t.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic])
  534. instance.countOffline((err, subsCount, clientsCount) => {
  535. t.error(err, 'no error')
  536. if (qos === 0) {
  537. t.equal(subsCount, 0, 'no subscriptions added')
  538. } else {
  539. t.equal(subsCount, 1, 'one subscription added')
  540. }
  541. t.equal(clientsCount, 1, 'one client added')
  542. cb()
  543. })
  544. })
  545. })
  546. })
  547. }
  548. check(0, () => {
  549. check(1, () => {
  550. check(2, () => {
  551. check(1, () => {
  552. check(0, () => {
  553. instance.destroy(t.end.bind(t))
  554. })
  555. })
  556. })
  557. })
  558. })
  559. })
  560. testInstance('replace subscriptions in same call', (t, instance) => {
  561. const client = { id: 'abcde' }
  562. const topic = 'hello'
  563. const subs = [
  564. { topic, qos: 0, rh: 0, rap: true, nl: false },
  565. { topic, qos: 1, rh: 0, rap: true, nl: false },
  566. { topic, qos: 2, rh: 0, rap: true, nl: false },
  567. { topic, qos: 1, rh: 0, rap: true, nl: false },
  568. { topic, qos: 0, rh: 0, rap: true, nl: false }
  569. ]
  570. instance.addSubscriptions(client, subs, (err, reClient) => {
  571. t.equal(reClient, client, 'client must be the same')
  572. t.error(err, 'no error')
  573. instance.subscriptionsByClient(client, (err, subsForClient, client) => {
  574. t.error(err, 'no error')
  575. t.deepEqual(subsForClient, [{ topic, qos: 0, rh: 0, rap: true, nl: false }])
  576. instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
  577. t.error(err, 'no error')
  578. t.deepEqual(subsForTopic, [])
  579. instance.countOffline((err, subsCount, clientsCount) => {
  580. t.error(err, 'no error')
  581. t.equal(subsCount, 0, 'no subscriptions added')
  582. t.equal(clientsCount, 1, 'one client added')
  583. instance.destroy(t.end.bind(t))
  584. })
  585. })
  586. })
  587. })
  588. })
  589. testInstance('store and count subscriptions', (t, instance) => {
  590. const client = { id: 'abcde' }
  591. const subs = [{
  592. topic: 'hello',
  593. qos: 1
  594. }, {
  595. topic: 'matteo',
  596. qos: 1
  597. }, {
  598. topic: 'noqos',
  599. qos: 0
  600. }]
  601. instance.addSubscriptions(client, subs, (err, reClient) => {
  602. t.equal(reClient, client, 'client must be the same')
  603. t.error(err, 'no error')
  604. instance.countOffline((err, subsCount, clientsCount) => {
  605. t.error(err, 'no error')
  606. t.equal(subsCount, 2, 'two subscriptions added')
  607. t.equal(clientsCount, 1, 'one client added')
  608. instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
  609. t.error(err, 'no error')
  610. instance.countOffline((err, subsCount, clientsCount) => {
  611. t.error(err, 'no error')
  612. t.equal(subsCount, 1, 'one subscription added')
  613. t.equal(clientsCount, 1, 'one client added')
  614. instance.removeSubscriptions(client, ['matteo'], (err, reClient) => {
  615. t.error(err, 'no error')
  616. instance.countOffline((err, subsCount, clientsCount) => {
  617. t.error(err, 'no error')
  618. t.equal(subsCount, 0, 'zero subscriptions added')
  619. t.equal(clientsCount, 1, 'one client added')
  620. instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
  621. t.error(err, 'no error')
  622. instance.countOffline((err, subsCount, clientsCount) => {
  623. t.error(err, 'no error')
  624. t.equal(subsCount, 0, 'zero subscriptions added')
  625. t.equal(clientsCount, 0, 'zero clients added')
  626. instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
  627. t.error(err, 'no error')
  628. instance.countOffline((err, subsCount, clientsCount) => {
  629. t.error(err, 'no error')
  630. t.equal(subsCount, 0, 'zero subscriptions added')
  631. t.equal(clientsCount, 0, 'zero clients added')
  632. instance.destroy(t.end.bind(t))
  633. })
  634. })
  635. })
  636. })
  637. })
  638. })
  639. })
  640. })
  641. })
  642. })
  643. })
  644. testInstance('count subscriptions with two clients', (t, instance) => {
  645. const client1 = { id: 'abcde' }
  646. const client2 = { id: 'fghij' }
  647. const subs = [{
  648. topic: 'hello',
  649. qos: 1
  650. }, {
  651. topic: 'matteo',
  652. qos: 1
  653. }, {
  654. topic: 'noqos',
  655. qos: 0
  656. }]
  657. function remove (client, subs, expectedSubs, expectedClients, cb) {
  658. instance.removeSubscriptions(client, subs, (err, reClient) => {
  659. t.error(err, 'no error')
  660. t.equal(reClient, client, 'client must be the same')
  661. instance.countOffline((err, subsCount, clientsCount) => {
  662. t.error(err, 'no error')
  663. t.equal(subsCount, expectedSubs, 'subscriptions added')
  664. t.equal(clientsCount, expectedClients, 'clients added')
  665. cb()
  666. })
  667. })
  668. }
  669. instance.addSubscriptions(client1, subs, (err, reClient) => {
  670. t.equal(reClient, client1, 'client must be the same')
  671. t.error(err, 'no error')
  672. instance.addSubscriptions(client2, subs, (err, reClient) => {
  673. t.equal(reClient, client2, 'client must be the same')
  674. t.error(err, 'no error')
  675. remove(client1, ['foobar'], 4, 2, () => {
  676. remove(client1, ['hello'], 3, 2, () => {
  677. remove(client1, ['hello'], 3, 2, () => {
  678. remove(client1, ['matteo'], 2, 2, () => {
  679. remove(client1, ['noqos'], 2, 1, () => {
  680. remove(client2, ['hello'], 1, 1, () => {
  681. remove(client2, ['matteo'], 0, 1, () => {
  682. remove(client2, ['noqos'], 0, 0, () => {
  683. instance.destroy(t.end.bind(t))
  684. })
  685. })
  686. })
  687. })
  688. })
  689. })
  690. })
  691. })
  692. })
  693. })
  694. })
  695. testInstance('add duplicate subs to persistence for qos > 0', (t, instance) => {
  696. const client = { id: 'abcde' }
  697. const topic = 'hello'
  698. const subs = [{
  699. topic,
  700. qos: 1,
  701. rh: 0,
  702. rap: true,
  703. nl: false
  704. }]
  705. instance.addSubscriptions(client, subs, (err, reClient) => {
  706. t.equal(reClient, client, 'client must be the same')
  707. t.error(err, 'no error')
  708. instance.addSubscriptions(client, subs, (err, resCLient) => {
  709. t.equal(resCLient, client, 'client must be the same')
  710. t.error(err, 'no error')
  711. subs[0].clientId = client.id
  712. instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
  713. t.error(err, 'no error')
  714. t.deepEqual(subsForTopic, subs)
  715. instance.destroy(t.end.bind(t))
  716. })
  717. })
  718. })
  719. })
  720. testInstance('add duplicate subs to persistence for qos 0', (t, instance) => {
  721. const client = { id: 'abcde' }
  722. const topic = 'hello'
  723. const subs = [{
  724. topic,
  725. qos: 0,
  726. rh: 0,
  727. rap: true,
  728. nl: false
  729. }]
  730. instance.addSubscriptions(client, subs, (err, reClient) => {
  731. t.equal(reClient, client, 'client must be the same')
  732. t.error(err, 'no error')
  733. instance.addSubscriptions(client, subs, (err, resCLient) => {
  734. t.equal(resCLient, client, 'client must be the same')
  735. t.error(err, 'no error')
  736. instance.subscriptionsByClient(client, (err, subsForClient, client) => {
  737. t.error(err, 'no error')
  738. t.deepEqual(subsForClient, subs)
  739. instance.destroy(t.end.bind(t))
  740. })
  741. })
  742. })
  743. })
  744. testInstance('get topic list after concurrent subscriptions of a client', (t, instance) => {
  745. const client = { id: 'abcde' }
  746. const subs1 = [{
  747. topic: 'hello1',
  748. qos: 1,
  749. rh: 0,
  750. rap: true,
  751. nl: false
  752. }]
  753. const subs2 = [{
  754. topic: 'hello2',
  755. qos: 1,
  756. rh: 0,
  757. rap: true,
  758. nl: false
  759. }]
  760. let calls = 2
  761. function done () {
  762. if (!--calls) {
  763. instance.subscriptionsByClient(client, (err, resubs) => {
  764. t.notOk(err, 'no error')
  765. resubs.sort((a, b) => b.topic.localeCompare(b.topic, 'en'))
  766. t.deepEqual(resubs, [subs1[0], subs2[0]])
  767. instance.destroy(t.end.bind(t))
  768. })
  769. }
  770. }
  771. instance.addSubscriptions(client, subs1, err => {
  772. t.notOk(err, 'no error for hello1')
  773. done()
  774. })
  775. instance.addSubscriptions(client, subs2, err => {
  776. t.notOk(err, 'no error for hello2')
  777. done()
  778. })
  779. })
  780. testInstance('add outgoing packet and stream it', (t, instance) => {
  781. const sub = {
  782. clientId: 'abcde',
  783. topic: 'hello',
  784. qos: 1
  785. }
  786. const client = {
  787. id: sub.clientId
  788. }
  789. const packet = {
  790. cmd: 'publish',
  791. topic: 'hello',
  792. payload: Buffer.from('world'),
  793. qos: 1,
  794. dup: false,
  795. length: 14,
  796. retain: false,
  797. brokerId: instance.broker.id,
  798. brokerCounter: 42
  799. }
  800. const expected = {
  801. cmd: 'publish',
  802. topic: 'hello',
  803. payload: Buffer.from('world'),
  804. qos: 1,
  805. retain: false,
  806. dup: false,
  807. brokerId: instance.broker.id,
  808. brokerCounter: 42,
  809. messageId: undefined
  810. }
  811. instance.outgoingEnqueue(sub, packet, err => {
  812. t.error(err)
  813. const stream = instance.outgoingStream(client)
  814. getArrayFromStream(stream).then(list => {
  815. const packet = list[0]
  816. testPacket(t, packet, expected)
  817. instance.destroy(t.end.bind(t))
  818. })
  819. })
  820. })
  821. testInstance('add outgoing packet for multiple subs and stream to all', (t, instance) => {
  822. const sub = {
  823. clientId: 'abcde',
  824. topic: 'hello',
  825. qos: 1
  826. }
  827. const sub2 = {
  828. clientId: 'fghih',
  829. topic: 'hello',
  830. qos: 1
  831. }
  832. const subs = [sub, sub2]
  833. const client = {
  834. id: sub.clientId
  835. }
  836. const client2 = {
  837. id: sub2.clientId
  838. }
  839. const packet = {
  840. cmd: 'publish',
  841. topic: 'hello',
  842. payload: Buffer.from('world'),
  843. qos: 1,
  844. dup: false,
  845. length: 14,
  846. retain: false,
  847. brokerId: instance.broker.id,
  848. brokerCounter: 42
  849. }
  850. const expected = {
  851. cmd: 'publish',
  852. topic: 'hello',
  853. payload: Buffer.from('world'),
  854. qos: 1,
  855. retain: false,
  856. dup: false,
  857. brokerId: instance.broker.id,
  858. brokerCounter: 42,
  859. messageId: undefined
  860. }
  861. instance.outgoingEnqueueCombi(subs, packet, err => {
  862. t.error(err)
  863. const stream = instance.outgoingStream(client)
  864. getArrayFromStream(stream).then(list => {
  865. const packet = list[0]
  866. testPacket(t, packet, expected)
  867. const stream2 = instance.outgoingStream(client2)
  868. getArrayFromStream(stream2).then(list2 => {
  869. const packet = list2[0]
  870. testPacket(t, packet, expected)
  871. instance.destroy(t.end.bind(t))
  872. })
  873. })
  874. })
  875. })
  876. testInstance('add outgoing packet as a string and pump', (t, instance) => {
  877. const sub = {
  878. clientId: 'abcde',
  879. topic: 'hello',
  880. qos: 1
  881. }
  882. const client = {
  883. id: sub.clientId
  884. }
  885. const packet1 = {
  886. cmd: 'publish',
  887. topic: 'hello',
  888. payload: Buffer.from('world'),
  889. qos: 1,
  890. retain: false,
  891. brokerId: instance.broker.id,
  892. brokerCounter: 10
  893. }
  894. const packet2 = {
  895. cmd: 'publish',
  896. topic: 'hello',
  897. payload: Buffer.from('matteo'),
  898. qos: 1,
  899. retain: false,
  900. brokerId: instance.broker.id,
  901. brokerCounter: 50
  902. }
  903. const queue = []
  904. enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
  905. enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
  906. const stream = instance.outgoingStream(client)
  907. async function clearQueue (data) {
  908. return new Promise((resolve, reject) => {
  909. instance.outgoingUpdate(client, data,
  910. (err, client, packet) => {
  911. t.notOk(err, 'no error')
  912. queue.push(packet)
  913. resolve()
  914. })
  915. })
  916. }
  917. streamForEach(stream, clearQueue).then(function done () {
  918. t.equal(queue.length, 2)
  919. if (queue.length === 2) {
  920. t.deepEqual(deClassed(queue[0]), deClassed(updated1))
  921. t.deepEqual(deClassed(queue[1]), deClassed(updated2))
  922. }
  923. instance.destroy(t.end.bind(t))
  924. })
  925. })
  926. })
  927. })
  928. testInstance('add outgoing packet as a string and stream', (t, instance) => {
  929. const sub = {
  930. clientId: 'abcde',
  931. topic: 'hello',
  932. qos: 1
  933. }
  934. const client = {
  935. id: sub.clientId
  936. }
  937. const packet = {
  938. cmd: 'publish',
  939. topic: 'hello',
  940. payload: 'world',
  941. qos: 1,
  942. dup: false,
  943. length: 14,
  944. retain: false,
  945. brokerId: instance.broker.id,
  946. brokerCounter: 42
  947. }
  948. const expected = {
  949. cmd: 'publish',
  950. topic: 'hello',
  951. payload: 'world',
  952. qos: 1,
  953. retain: false,
  954. dup: false,
  955. brokerId: instance.broker.id,
  956. brokerCounter: 42,
  957. messageId: undefined
  958. }
  959. instance.outgoingEnqueueCombi([sub], packet, err => {
  960. t.error(err)
  961. const stream = instance.outgoingStream(client)
  962. getArrayFromStream(stream).then(list => {
  963. const packet = list[0]
  964. testPacket(t, packet, expected)
  965. instance.destroy(t.end.bind(t))
  966. })
  967. })
  968. })
  969. testInstance('add outgoing packet and stream it twice', (t, instance) => {
  970. const sub = {
  971. clientId: 'abcde',
  972. topic: 'hello',
  973. qos: 1
  974. }
  975. const client = {
  976. id: sub.clientId
  977. }
  978. const packet = {
  979. cmd: 'publish',
  980. topic: 'hello',
  981. payload: Buffer.from('world'),
  982. qos: 1,
  983. dup: false,
  984. length: 14,
  985. retain: false,
  986. brokerId: instance.broker.id,
  987. brokerCounter: 42,
  988. messageId: 4242
  989. }
  990. const expected = {
  991. cmd: 'publish',
  992. topic: 'hello',
  993. payload: Buffer.from('world'),
  994. qos: 1,
  995. retain: false,
  996. dup: false,
  997. brokerId: instance.broker.id,
  998. brokerCounter: 42,
  999. messageId: undefined
  1000. }
  1001. instance.outgoingEnqueueCombi([sub], packet, err => {
  1002. t.error(err)
  1003. const stream = instance.outgoingStream(client)
  1004. getArrayFromStream(stream).then(list => {
  1005. const packet = list[0]
  1006. testPacket(t, packet, expected)
  1007. const stream2 = instance.outgoingStream(client)
  1008. getArrayFromStream(stream2).then(list2 => {
  1009. const packet = list2[0]
  1010. testPacket(t, packet, expected)
  1011. t.notEqual(packet, expected, 'packet must be a different object')
  1012. instance.destroy(t.end.bind(t))
  1013. })
  1014. })
  1015. })
  1016. })
  1017. function enqueueAndUpdate (t, instance, client, sub, packet, messageId, callback) {
  1018. instance.outgoingEnqueueCombi([sub], packet, err => {
  1019. t.error(err)
  1020. const updated = new Packet(packet)
  1021. updated.messageId = messageId
  1022. instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
  1023. t.error(err)
  1024. t.equal(reclient, client, 'client matches')
  1025. t.equal(repacket, updated, 'packet matches')
  1026. callback(updated)
  1027. })
  1028. })
  1029. }
  1030. testInstance('add outgoing packet and update messageId', (t, instance) => {
  1031. const sub = {
  1032. clientId: 'abcde', topic: 'hello', qos: 1
  1033. }
  1034. const client = {
  1035. id: sub.clientId
  1036. }
  1037. const packet = {
  1038. cmd: 'publish',
  1039. topic: 'hello',
  1040. payload: Buffer.from('world'),
  1041. qos: 1,
  1042. dup: false,
  1043. length: 14,
  1044. retain: false,
  1045. brokerId: instance.broker.id,
  1046. brokerCounter: 42
  1047. }
  1048. enqueueAndUpdate(t, instance, client, sub, packet, 42, updated => {
  1049. const stream = instance.outgoingStream(client)
  1050. delete updated.messageId
  1051. getArrayFromStream(stream).then(list => {
  1052. delete list[0].messageId
  1053. t.notEqual(list[0], updated, 'must not be the same object')
  1054. t.deepEqual(deClassed(list[0]), deClassed(updated), 'must return the packet')
  1055. t.equal(list.length, 1, 'must return only one packet')
  1056. instance.destroy(t.end.bind(t))
  1057. })
  1058. })
  1059. })
  1060. testInstance('add 2 outgoing packet and clear messageId', (t, instance) => {
  1061. const sub = {
  1062. clientId: 'abcde', topic: 'hello', qos: 1
  1063. }
  1064. const client = {
  1065. id: sub.clientId
  1066. }
  1067. const packet1 = {
  1068. cmd: 'publish',
  1069. topic: 'hello',
  1070. payload: Buffer.from('world'),
  1071. qos: 1,
  1072. dup: false,
  1073. length: 14,
  1074. retain: false,
  1075. brokerId: instance.broker.id,
  1076. brokerCounter: 42
  1077. }
  1078. const packet2 = {
  1079. cmd: 'publish',
  1080. topic: 'hello',
  1081. payload: Buffer.from('matteo'),
  1082. qos: 1,
  1083. dup: false,
  1084. length: 14,
  1085. retain: false,
  1086. brokerId: instance.broker.id,
  1087. brokerCounter: 43
  1088. }
  1089. enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
  1090. enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
  1091. instance.outgoingClearMessageId(client, updated1, (err, packet) => {
  1092. t.error(err)
  1093. t.deepEqual(packet.messageId, 42, 'must have the same messageId')
  1094. t.deepEqual(packet.payload.toString(), packet1.payload.toString(), 'must have original payload')
  1095. t.deepEqual(packet.topic, packet1.topic, 'must have original topic')
  1096. const stream = instance.outgoingStream(client)
  1097. delete updated2.messageId
  1098. getArrayFromStream(stream).then(list => {
  1099. delete list[0].messageId
  1100. t.notEqual(list[0], updated2, 'must not be the same object')
  1101. t.deepEqual(deClassed(list[0]), deClassed(updated2), 'must return the packet')
  1102. t.equal(list.length, 1, 'must return only one packet')
  1103. instance.destroy(t.end.bind(t))
  1104. })
  1105. })
  1106. })
  1107. })
  1108. })
  1109. testInstance('add many outgoing packets and clear messageIds', async (t, instance) => {
  1110. const sub = {
  1111. clientId: 'abcde', topic: 'hello', qos: 1
  1112. }
  1113. const client = {
  1114. id: sub.clientId
  1115. }
  1116. const packet = {
  1117. cmd: 'publish',
  1118. topic: 'hello',
  1119. payload: Buffer.from('world'),
  1120. qos: 1,
  1121. dup: false,
  1122. length: 14,
  1123. retain: false
  1124. }
  1125. function outStream (instance, client) {
  1126. return iterableStream(instance.outgoingStream(client))
  1127. }
  1128. // we just need a stream to figure out the high watermark
  1129. const stream = outStream(instance, client)
  1130. const total = stream.readableHighWaterMark * 2
  1131. function submitMessage (id) {
  1132. return new Promise((resolve, reject) => {
  1133. const p = new Packet(packet, instance.broker)
  1134. p.messageId = id
  1135. instance.outgoingEnqueue(sub, p, (err) => {
  1136. if (err) {
  1137. return reject(err)
  1138. }
  1139. instance.outgoingUpdate(client, p, resolve)
  1140. })
  1141. })
  1142. }
  1143. function clearMessage (p) {
  1144. return new Promise((resolve, reject) => {
  1145. instance.outgoingClearMessageId(client, p, (err, received) => {
  1146. t.error(err)
  1147. t.deepEqual(received, p, 'must return the packet')
  1148. resolve()
  1149. })
  1150. })
  1151. }
  1152. for (let i = 0; i < total; i++) {
  1153. await submitMessage(i)
  1154. }
  1155. let queued = 0
  1156. for await (const p of outStream(instance, client)) {
  1157. if (p) {
  1158. queued++
  1159. }
  1160. }
  1161. t.equal(queued, total, `outgoing queue must hold ${total} items`)
  1162. for await (const p of outStream(instance, client)) {
  1163. await clearMessage(p)
  1164. }
  1165. let queued2 = 0
  1166. for await (const p of outStream(instance, client)) {
  1167. if (p) {
  1168. queued2++
  1169. }
  1170. }
  1171. t.equal(queued2, 0, 'outgoing queue is empty')
  1172. instance.destroy(t.end.bind(t))
  1173. })
  1174. testInstance('update to publish w/ same messageId', (t, instance) => {
  1175. const sub = {
  1176. clientId: 'abcde', topic: 'hello', qos: 1
  1177. }
  1178. const client = {
  1179. id: sub.clientId
  1180. }
  1181. const packet1 = {
  1182. cmd: 'publish',
  1183. topic: 'hello',
  1184. payload: Buffer.from('world'),
  1185. qos: 2,
  1186. dup: false,
  1187. length: 14,
  1188. retain: false,
  1189. brokerId: instance.broker.id,
  1190. brokerCounter: 42,
  1191. messageId: 42
  1192. }
  1193. const packet2 = {
  1194. cmd: 'publish',
  1195. topic: 'hello',
  1196. payload: Buffer.from('world'),
  1197. qos: 2,
  1198. dup: false,
  1199. length: 14,
  1200. retain: false,
  1201. brokerId: instance.broker.id,
  1202. brokerCounter: 50,
  1203. messageId: 42
  1204. }
  1205. instance.outgoingEnqueue(sub, packet1, () => {
  1206. instance.outgoingEnqueue(sub, packet2, () => {
  1207. instance.outgoingUpdate(client, packet1, () => {
  1208. instance.outgoingUpdate(client, packet2, () => {
  1209. const stream = instance.outgoingStream(client)
  1210. getArrayFromStream(stream).then(list => {
  1211. t.equal(list.length, 2, 'must have two items in queue')
  1212. t.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match')
  1213. t.equal(list[0].messageId, packet1.messageId, 'messageId must match')
  1214. t.equal(list[1].brokerCounter, packet2.brokerCounter, 'brokerCounter must match')
  1215. t.equal(list[1].messageId, packet2.messageId, 'messageId must match')
  1216. instance.destroy(t.end.bind(t))
  1217. })
  1218. })
  1219. })
  1220. })
  1221. })
  1222. })
  1223. testInstance('update to pubrel', (t, instance) => {
  1224. const sub = {
  1225. clientId: 'abcde', topic: 'hello', qos: 1
  1226. }
  1227. const client = {
  1228. id: sub.clientId
  1229. }
  1230. const packet = {
  1231. cmd: 'publish',
  1232. topic: 'hello',
  1233. payload: Buffer.from('world'),
  1234. qos: 2,
  1235. dup: false,
  1236. length: 14,
  1237. retain: false,
  1238. brokerId: instance.broker.id,
  1239. brokerCounter: 42
  1240. }
  1241. instance.outgoingEnqueueCombi([sub], packet, err => {
  1242. t.error(err)
  1243. const updated = new Packet(packet)
  1244. updated.messageId = 42
  1245. instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
  1246. t.error(err)
  1247. t.equal(reclient, client, 'client matches')
  1248. t.equal(repacket, updated, 'packet matches')
  1249. const pubrel = {
  1250. cmd: 'pubrel',
  1251. messageId: updated.messageId
  1252. }
  1253. instance.outgoingUpdate(client, pubrel, err => {
  1254. t.error(err)
  1255. const stream = instance.outgoingStream(client)
  1256. getArrayFromStream(stream).then(list => {
  1257. t.deepEqual(list, [pubrel], 'must return the packet')
  1258. instance.destroy(t.end.bind(t))
  1259. })
  1260. })
  1261. })
  1262. })
  1263. })
  1264. testInstance('add incoming packet, get it, and clear with messageId', (t, instance) => {
  1265. const client = {
  1266. id: 'abcde'
  1267. }
  1268. const packet = {
  1269. cmd: 'publish',
  1270. topic: 'hello',
  1271. payload: Buffer.from('world'),
  1272. qos: 2,
  1273. dup: false,
  1274. length: 14,
  1275. retain: false,
  1276. messageId: 42
  1277. }
  1278. instance.incomingStorePacket(client, packet, err => {
  1279. t.error(err)
  1280. instance.incomingGetPacket(client, {
  1281. messageId: packet.messageId
  1282. }, (err, retrieved) => {
  1283. t.error(err)
  1284. // adjusting the objects so they match
  1285. delete retrieved.brokerCounter
  1286. delete retrieved.brokerId
  1287. delete packet.length
  1288. t.deepLooseEqual(retrieved, packet, 'retrieved packet must be deeply equal')
  1289. t.notEqual(retrieved, packet, 'retrieved packet must not be the same objet')
  1290. instance.incomingDelPacket(client, retrieved, err => {
  1291. t.error(err)
  1292. instance.incomingGetPacket(client, {
  1293. messageId: packet.messageId
  1294. }, (err, retrieved) => {
  1295. t.ok(err, 'must error')
  1296. instance.destroy(t.end.bind(t))
  1297. })
  1298. })
  1299. })
  1300. })
  1301. })
  1302. testInstance('store, fetch and delete will message', (t, instance) => {
  1303. const client = {
  1304. id: '12345'
  1305. }
  1306. const expected = {
  1307. topic: 'hello/died',
  1308. payload: Buffer.from('muahahha'),
  1309. qos: 0,
  1310. retain: true
  1311. }
  1312. instance.putWill(client, expected, (err, c) => {
  1313. t.error(err, 'no error')
  1314. t.equal(c, client, 'client matches')
  1315. instance.getWill(client, (err, packet, c) => {
  1316. t.error(err, 'no error')
  1317. t.deepEqual(packet, expected, 'will matches')
  1318. t.equal(c, client, 'client matches')
  1319. client.brokerId = packet.brokerId
  1320. instance.delWill(client, (err, packet, c) => {
  1321. t.error(err, 'no error')
  1322. t.deepEqual(packet, expected, 'will matches')
  1323. t.equal(c, client, 'client matches')
  1324. instance.getWill(client, (err, packet, c) => {
  1325. t.error(err, 'no error')
  1326. t.notOk(packet, 'no will after del')
  1327. t.equal(c, client, 'client matches')
  1328. instance.destroy(t.end.bind(t))
  1329. })
  1330. })
  1331. })
  1332. })
  1333. })
  1334. testInstance('stream all will messages', (t, instance) => {
  1335. const client = {
  1336. id: '12345'
  1337. }
  1338. const toWrite = {
  1339. topic: 'hello/died',
  1340. payload: Buffer.from('muahahha'),
  1341. qos: 0,
  1342. retain: true
  1343. }
  1344. instance.putWill(client, toWrite, (err, c) => {
  1345. t.error(err, 'no error')
  1346. t.equal(c, client, 'client matches')
  1347. streamForEach(instance.streamWill(), (chunk) => {
  1348. t.deepEqual(chunk, {
  1349. clientId: client.id,
  1350. brokerId: instance.broker.id,
  1351. topic: 'hello/died',
  1352. payload: Buffer.from('muahahha'),
  1353. qos: 0,
  1354. retain: true
  1355. }, 'packet matches')
  1356. instance.delWill(client, (err, result, client) => {
  1357. t.error(err, 'no error')
  1358. instance.destroy(t.end.bind(t))
  1359. })
  1360. })
  1361. })
  1362. })
  1363. testInstance('stream all will message for unknown brokers', (t, instance) => {
  1364. const originalId = instance.broker.id
  1365. const client = {
  1366. id: '42'
  1367. }
  1368. const anotherClient = {
  1369. id: '24'
  1370. }
  1371. const toWrite1 = {
  1372. topic: 'hello/died42',
  1373. payload: Buffer.from('muahahha'),
  1374. qos: 0,
  1375. retain: true
  1376. }
  1377. const toWrite2 = {
  1378. topic: 'hello/died24',
  1379. payload: Buffer.from('muahahha'),
  1380. qos: 0,
  1381. retain: true
  1382. }
  1383. instance.putWill(client, toWrite1, (err, c) => {
  1384. t.error(err, 'no error')
  1385. t.equal(c, client, 'client matches')
  1386. instance.broker.id = 'anotherBroker'
  1387. instance.putWill(anotherClient, toWrite2, (err, c) => {
  1388. t.error(err, 'no error')
  1389. t.equal(c, anotherClient, 'client matches')
  1390. streamForEach(instance.streamWill({
  1391. anotherBroker: Date.now()
  1392. }), (chunk) => {
  1393. t.deepEqual(chunk, {
  1394. clientId: client.id,
  1395. brokerId: originalId,
  1396. topic: 'hello/died42',
  1397. payload: Buffer.from('muahahha'),
  1398. qos: 0,
  1399. retain: true
  1400. }, 'packet matches')
  1401. instance.delWill(client, (err, result, client) => {
  1402. t.error(err, 'no error')
  1403. instance.destroy(t.end.bind(t))
  1404. })
  1405. })
  1406. })
  1407. })
  1408. })
  1409. testInstance('delete wills from dead brokers', (t, instance) => {
  1410. const client = {
  1411. id: '42'
  1412. }
  1413. const toWrite1 = {
  1414. topic: 'hello/died42',
  1415. payload: Buffer.from('muahahha'),
  1416. qos: 0,
  1417. retain: true
  1418. }
  1419. instance.putWill(client, toWrite1, (err, c) => {
  1420. t.error(err, 'no error')
  1421. t.equal(c, client, 'client matches')
  1422. instance.broker.id = 'anotherBroker'
  1423. client.brokerId = instance.broker.id
  1424. instance.delWill(client, (err, result, client) => {
  1425. t.error(err, 'no error')
  1426. instance.destroy(t.end.bind(t))
  1427. })
  1428. })
  1429. })
  1430. testInstance('do not error if unkown messageId in outoingClearMessageId', (t, instance) => {
  1431. const client = {
  1432. id: 'abc-123'
  1433. }
  1434. instance.outgoingClearMessageId(client, 42, err => {
  1435. t.error(err)
  1436. instance.destroy(t.end.bind(t))
  1437. })
  1438. })
  1439. }
  1440. module.exports = abstractPersistence