qos2.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. 'use strict'
  2. const { test } = require('tap')
  3. const concat = require('concat-stream')
  4. const { setup, connect, subscribe } = require('./helper')
  5. const aedes = require('../')
  6. function publish (t, s, packet, done) {
  7. const msgId = packet.messageId
  8. s.inStream.write(packet)
  9. s.outStream.once('data', function (packet) {
  10. t.same(packet, {
  11. cmd: 'pubrec',
  12. messageId: msgId,
  13. length: 2,
  14. dup: false,
  15. retain: false,
  16. qos: 0
  17. }, 'pubrec must match')
  18. s.inStream.write({
  19. cmd: 'pubrel',
  20. messageId: msgId
  21. })
  22. s.outStream.once('data', function (packet) {
  23. t.same(packet, {
  24. cmd: 'pubcomp',
  25. messageId: msgId,
  26. length: 2,
  27. dup: false,
  28. retain: false,
  29. qos: 0
  30. }, 'pubcomp must match')
  31. if (done) {
  32. done()
  33. }
  34. })
  35. })
  36. }
  37. function receive (t, subscriber, expected, done) {
  38. subscriber.outStream.once('data', function (packet) {
  39. t.not(packet.messageId, expected.messageId, 'messageId must differ')
  40. const msgId = packet.messageId
  41. delete packet.messageId
  42. delete expected.messageId
  43. t.same(packet, expected, 'packet must match')
  44. subscriber.inStream.write({
  45. cmd: 'pubrec',
  46. messageId: msgId
  47. })
  48. subscriber.outStream.once('data', function (packet) {
  49. subscriber.inStream.write({
  50. cmd: 'pubcomp',
  51. messageId: msgId
  52. })
  53. t.same(packet, {
  54. cmd: 'pubrel',
  55. messageId: msgId,
  56. length: 2,
  57. qos: 1,
  58. retain: false,
  59. dup: false
  60. }, 'pubrel must match')
  61. if (done) {
  62. done()
  63. }
  64. })
  65. })
  66. }
  67. test('publish QoS 2', function (t) {
  68. t.plan(2)
  69. const s = connect(setup())
  70. t.teardown(s.broker.close.bind(s.broker))
  71. const packet = {
  72. cmd: 'publish',
  73. topic: 'hello',
  74. payload: 'world',
  75. qos: 2,
  76. messageId: 42
  77. }
  78. publish(t, s, packet)
  79. })
  80. test('subscribe QoS 2', function (t) {
  81. t.plan(8)
  82. const broker = aedes()
  83. t.teardown(broker.close.bind(broker))
  84. const publisher = connect(setup(broker))
  85. const subscriber = connect(setup(broker))
  86. const toPublish = {
  87. cmd: 'publish',
  88. topic: 'hello',
  89. payload: Buffer.from('world'),
  90. qos: 2,
  91. messageId: 42,
  92. dup: false,
  93. length: 14,
  94. retain: false
  95. }
  96. subscribe(t, subscriber, 'hello', 2, function () {
  97. publish(t, publisher, toPublish)
  98. receive(t, subscriber, toPublish)
  99. })
  100. })
  101. test('publish QoS 2 throws error on write', function (t) {
  102. t.plan(1)
  103. const s = connect(setup())
  104. t.teardown(s.broker.close.bind(s.broker))
  105. s.broker.on('client', function (client) {
  106. client.connected = false
  107. client.connecting = false
  108. s.inStream.write({
  109. cmd: 'publish',
  110. topic: 'hello',
  111. payload: 'world',
  112. qos: 2,
  113. messageId: 42
  114. })
  115. })
  116. s.broker.on('clientError', function (client, err) {
  117. t.equal(err.message, 'connection closed', 'throws error')
  118. })
  119. })
  120. test('pubrec handler calls done when outgoingUpdate fails (clean=false)', function (t) {
  121. t.plan(1)
  122. const s = connect(setup(), { clean: false })
  123. t.teardown(s.broker.close.bind(s.broker))
  124. const handle = require('../lib/handlers/pubrec.js')
  125. s.broker.persistence.outgoingUpdate = function (client, pubrel, done) {
  126. done(Error('throws error'))
  127. }
  128. handle(s.client, { messageId: 42 }, function done () {
  129. t.pass('calls done on error')
  130. })
  131. })
  132. test('client.publish with clean=true subscribption QoS 2', function (t) {
  133. t.plan(8)
  134. const broker = aedes()
  135. t.teardown(broker.close.bind(broker))
  136. const toPublish = {
  137. cmd: 'publish',
  138. topic: 'hello',
  139. payload: Buffer.from('world'),
  140. qos: 2,
  141. messageId: 42,
  142. dup: false,
  143. length: 14,
  144. retain: false
  145. }
  146. let brokerClient = null
  147. broker.on('client', function (client) {
  148. brokerClient = client
  149. brokerClient.on('error', function (err) {
  150. t.error(err)
  151. })
  152. })
  153. const subscriber = connect(setup(broker), { clean: true })
  154. subscribe(t, subscriber, 'hello', 2, function () {
  155. t.pass('subscribed')
  156. receive(t, subscriber, toPublish)
  157. brokerClient.publish(toPublish, function (err) {
  158. t.error(err)
  159. })
  160. })
  161. })
  162. test('call published method with client with QoS 2', function (t) {
  163. t.plan(9)
  164. const broker = aedes()
  165. t.teardown(broker.close.bind(broker))
  166. const publisher = connect(setup(broker))
  167. const subscriber = connect(setup(broker))
  168. const toPublish = {
  169. cmd: 'publish',
  170. topic: 'hello',
  171. payload: Buffer.from('world'),
  172. qos: 2,
  173. messageId: 42,
  174. dup: false,
  175. length: 14,
  176. retain: false
  177. }
  178. broker.published = function (packet, client, cb) {
  179. // Client is null for all server publishes
  180. if (packet.topic.split('/')[0] !== '$SYS') {
  181. t.ok(client, 'client must be passed to published method')
  182. cb()
  183. }
  184. }
  185. subscribe(t, subscriber, 'hello', 2, function () {
  186. publish(t, publisher, toPublish)
  187. receive(t, subscriber, toPublish)
  188. })
  189. })
  190. ;[true, false].forEach(function (cleanSession) {
  191. test(`authorized forward publish packets in QoS 2 [clean=${cleanSession}]`, function (t) {
  192. t.plan(9)
  193. const broker = aedes()
  194. t.teardown(broker.close.bind(broker))
  195. const opts = { clean: cleanSession }
  196. const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' })
  197. const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' })
  198. const forwarded = {
  199. cmd: 'publish',
  200. topic: 'hello',
  201. payload: Buffer.from('world'),
  202. qos: 2,
  203. retain: false,
  204. dup: false,
  205. messageId: undefined,
  206. clientId: 'my-client-xyz-8'
  207. }
  208. const expected = {
  209. cmd: 'publish',
  210. topic: 'hello',
  211. payload: Buffer.from('world'),
  212. qos: 2,
  213. retain: false,
  214. length: 14,
  215. dup: false
  216. }
  217. broker.authorizeForward = function (client, packet) {
  218. forwarded.brokerId = broker.id
  219. forwarded.brokerCounter = broker.counter
  220. delete packet.nl
  221. t.same(packet, forwarded, 'forwarded packet must match')
  222. return packet
  223. }
  224. subscribe(t, subscriber, 'hello', 2, function () {
  225. subscriber.outStream.once('data', function (packet) {
  226. t.not(packet.messageId, 42)
  227. delete packet.messageId
  228. t.same(packet, expected, 'packet must match')
  229. })
  230. publish(t, publisher, {
  231. cmd: 'publish',
  232. topic: 'hello',
  233. payload: Buffer.from('world'),
  234. qos: 2,
  235. retain: false,
  236. messageId: 42,
  237. dup: false
  238. }, function () {
  239. const stream = broker.persistence.outgoingStream({ id: 'abcde' })
  240. stream.pipe(concat(function (list) {
  241. if (cleanSession) {
  242. t.equal(list.length, 0, 'should have empty item in queue')
  243. } else {
  244. t.equal(list.length, 1, 'should have one item in queue')
  245. }
  246. }))
  247. })
  248. })
  249. })
  250. })
  251. ;[true, false].forEach(function (cleanSession) {
  252. test(`unauthorized forward publish packets in QoS 2 [clean=${cleanSession}]`, function (t) {
  253. t.plan(6)
  254. const broker = aedes()
  255. t.teardown(broker.close.bind(broker))
  256. const opts = { clean: cleanSession }
  257. const publisher = connect(setup(broker))
  258. const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' })
  259. broker.authorizeForward = function (client, packet) {
  260. }
  261. subscribe(t, subscriber, 'hello', 2, function () {
  262. subscriber.outStream.once('data', function (packet) {
  263. t.fail('should not receive any packets')
  264. })
  265. publish(t, publisher, {
  266. cmd: 'publish',
  267. topic: 'hello',
  268. payload: Buffer.from('world'),
  269. qos: 2,
  270. retain: false,
  271. messageId: 42,
  272. dup: false
  273. }, function () {
  274. const stream = broker.persistence.outgoingStream({ id: 'abcde' })
  275. stream.pipe(concat(function (list) {
  276. t.equal(list.length, 0, 'should empty in queue')
  277. }))
  278. })
  279. })
  280. })
  281. })
  282. test('subscribe QoS 0, but publish QoS 2', function (t) {
  283. t.plan(6)
  284. const broker = aedes()
  285. t.teardown(broker.close.bind(broker))
  286. const publisher = connect(setup(broker))
  287. const subscriber = connect(setup(broker))
  288. const expected = {
  289. cmd: 'publish',
  290. topic: 'hello',
  291. payload: Buffer.from('world'),
  292. qos: 0,
  293. dup: false,
  294. length: 12,
  295. retain: false
  296. }
  297. subscribe(t, subscriber, 'hello', 0, function () {
  298. subscriber.outStream.once('data', function (packet) {
  299. t.same(packet, expected, 'packet must match')
  300. })
  301. publish(t, publisher, {
  302. cmd: 'publish',
  303. topic: 'hello',
  304. payload: Buffer.from('world'),
  305. qos: 2,
  306. retain: false,
  307. messageId: 42,
  308. dup: false
  309. })
  310. })
  311. })
  312. test('subscribe QoS 1, but publish QoS 2', function (t) {
  313. t.plan(6)
  314. const broker = aedes()
  315. t.teardown(broker.close.bind(broker))
  316. const publisher = connect(setup(broker))
  317. const subscriber = connect(setup(broker))
  318. const expected = {
  319. cmd: 'publish',
  320. topic: 'hello',
  321. payload: Buffer.from('world'),
  322. qos: 1,
  323. dup: false,
  324. length: 14,
  325. retain: false
  326. }
  327. subscribe(t, subscriber, 'hello', 1, function () {
  328. subscriber.outStream.once('data', function (packet) {
  329. delete packet.messageId
  330. t.same(packet, expected, 'packet must match')
  331. })
  332. publish(t, publisher, {
  333. cmd: 'publish',
  334. topic: 'hello',
  335. payload: Buffer.from('world'),
  336. qos: 2,
  337. retain: false,
  338. messageId: 42,
  339. dup: false
  340. })
  341. })
  342. })
  343. test('restore QoS 2 subscriptions not clean', function (t) {
  344. t.plan(9)
  345. const broker = aedes()
  346. t.teardown(broker.close.bind(broker))
  347. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  348. const expected = {
  349. cmd: 'publish',
  350. topic: 'hello',
  351. payload: Buffer.from('world'),
  352. qos: 2,
  353. dup: false,
  354. length: 14,
  355. messageId: 42,
  356. retain: false
  357. }
  358. subscribe(t, subscriber, 'hello', 2, function () {
  359. subscriber.inStream.end()
  360. const publisher = connect(setup(broker))
  361. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
  362. t.equal(connect.sessionPresent, true, 'session present is set to true')
  363. publish(t, publisher, expected)
  364. })
  365. receive(t, subscriber, expected)
  366. })
  367. })
  368. test('resend publish on non-clean reconnect QoS 2', function (t) {
  369. t.plan(8)
  370. const broker = aedes()
  371. t.teardown(broker.close.bind(broker))
  372. const opts = { clean: false, clientId: 'abcde' }
  373. let subscriber = connect(setup(broker), opts)
  374. const expected = {
  375. cmd: 'publish',
  376. topic: 'hello',
  377. payload: Buffer.from('world'),
  378. qos: 2,
  379. dup: false,
  380. length: 14,
  381. messageId: 42,
  382. retain: false
  383. }
  384. subscribe(t, subscriber, 'hello', 2, function () {
  385. subscriber.inStream.end()
  386. const publisher = connect(setup(broker))
  387. publish(t, publisher, expected, function () {
  388. subscriber = connect(setup(broker), opts)
  389. receive(t, subscriber, expected)
  390. })
  391. })
  392. })
  393. test('resend pubrel on non-clean reconnect QoS 2', function (t) {
  394. t.plan(9)
  395. const broker = aedes()
  396. t.teardown(broker.close.bind(broker))
  397. const opts = { clean: false, clientId: 'abcde' }
  398. let subscriber = connect(setup(broker), opts)
  399. const expected = {
  400. cmd: 'publish',
  401. topic: 'hello',
  402. payload: Buffer.from('world'),
  403. qos: 2,
  404. dup: false,
  405. length: 14,
  406. messageId: 42,
  407. retain: false
  408. }
  409. subscribe(t, subscriber, 'hello', 2, function () {
  410. subscriber.inStream.end()
  411. const publisher = connect(setup(broker))
  412. publish(t, publisher, expected, function () {
  413. subscriber = connect(setup(broker), opts)
  414. subscriber.outStream.once('data', function (packet) {
  415. t.not(packet.messageId, expected.messageId, 'messageId must differ')
  416. const msgId = packet.messageId
  417. delete packet.messageId
  418. delete expected.messageId
  419. t.same(packet, expected, 'packet must match')
  420. subscriber.inStream.write({
  421. cmd: 'pubrec',
  422. messageId: msgId
  423. })
  424. subscriber.outStream.once('data', function (packet) {
  425. t.same(packet, {
  426. cmd: 'pubrel',
  427. messageId: msgId,
  428. length: 2,
  429. qos: 1,
  430. retain: false,
  431. dup: false
  432. }, 'pubrel must match')
  433. subscriber.inStream.end()
  434. subscriber = connect(setup(broker), opts)
  435. subscriber.outStream.once('data', function (packet) {
  436. t.same(packet, {
  437. cmd: 'pubrel',
  438. messageId: msgId,
  439. length: 2,
  440. qos: 1,
  441. retain: false,
  442. dup: false
  443. }, 'pubrel must match')
  444. subscriber.inStream.write({
  445. cmd: 'pubcomp',
  446. messageId: msgId
  447. })
  448. })
  449. })
  450. })
  451. })
  452. })
  453. })
  454. test('publish after disconnection', function (t) {
  455. t.plan(10)
  456. const broker = aedes()
  457. t.teardown(broker.close.bind(broker))
  458. const publisher = connect(setup(broker))
  459. const subscriber = connect(setup(broker))
  460. const toPublish = {
  461. cmd: 'publish',
  462. topic: 'hello',
  463. payload: Buffer.from('world'),
  464. qos: 2,
  465. messageId: 42,
  466. dup: false,
  467. length: 14,
  468. retain: false
  469. }
  470. const toPublish2 = {
  471. cmd: 'publish',
  472. topic: 'hello',
  473. payload: Buffer.from('worl2'),
  474. qos: 2,
  475. messageId: 43,
  476. dup: false,
  477. length: 14,
  478. retain: false
  479. }
  480. subscribe(t, subscriber, 'hello', 2, function () {
  481. publish(t, publisher, toPublish)
  482. receive(t, subscriber, toPublish, function () {
  483. publish(t, publisher, toPublish2)
  484. })
  485. })
  486. })
  487. test('multiple publish and store one', function (t) {
  488. t.plan(2)
  489. const broker = aedes()
  490. const sid = {
  491. id: 'abcde'
  492. }
  493. const s = connect(setup(broker), { clientId: sid.id })
  494. const toPublish = {
  495. cmd: 'publish',
  496. topic: 'hello',
  497. payload: Buffer.from('world'),
  498. qos: 2,
  499. retain: false,
  500. dup: false,
  501. messageId: 42
  502. }
  503. let count = 5
  504. while (count--) {
  505. s.inStream.write(toPublish)
  506. }
  507. let recvcnt = 0
  508. s.outStream.on('data', function (packet) {
  509. if (++recvcnt < 5) return
  510. broker.close(function () {
  511. broker.persistence.incomingGetPacket(sid, toPublish, function (err, origPacket) {
  512. delete origPacket.brokerId
  513. delete origPacket.brokerCounter
  514. t.same(origPacket, toPublish, 'packet must match')
  515. t.error(err)
  516. })
  517. })
  518. })
  519. })
  520. test('packet is written to stream after being stored', function (t) {
  521. const s = connect(setup())
  522. const broker = s.broker
  523. t.teardown(broker.close.bind(s.broker))
  524. let packetStored = false
  525. const fn = broker.persistence.incomingStorePacket.bind(broker.persistence)
  526. s.broker.persistence.incomingStorePacket = function (client, packet, done) {
  527. packetStored = true
  528. t.pass('packet stored')
  529. fn(client, packet, done)
  530. }
  531. const packet = {
  532. cmd: 'publish',
  533. topic: 'hello',
  534. payload: 'world',
  535. qos: 2,
  536. messageId: 42
  537. }
  538. publish(t, s, packet)
  539. s.outStream.once('data', function (packet) {
  540. t.equal(packet.cmd, 'pubrec', 'pubrec received')
  541. t.equal(packetStored, true, 'after packet store')
  542. t.end()
  543. })
  544. })
  545. test('not send pubrec when persistence fails to store packet', function (t) {
  546. t.plan(2)
  547. const s = connect(setup())
  548. const broker = s.broker
  549. t.teardown(broker.close.bind(s.broker))
  550. s.broker.persistence.incomingStorePacket = function (client, packet, done) {
  551. t.pass('packet stored')
  552. done(new Error('store error'))
  553. }
  554. s.broker.on('clientError', function (client, err) {
  555. t.equal(err.message, 'store error')
  556. })
  557. const packet = {
  558. cmd: 'publish',
  559. topic: 'hello',
  560. payload: 'world',
  561. qos: 2,
  562. messageId: 42
  563. }
  564. s.inStream.write(packet)
  565. s.outStream.once('data', function (packet) {
  566. t.fail('should not have pubrec')
  567. })
  568. })