qos1.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. 'use strict'
  2. const { test } = require('tap')
  3. const concat = require('concat-stream')
  4. const { setup, connect, subscribe } = require('./helper')
  5. const Faketimers = require('@sinonjs/fake-timers')
  6. const aedes = require('../')
  7. test('publish QoS 1', function (t) {
  8. t.plan(1)
  9. const s = connect(setup())
  10. t.teardown(s.broker.close.bind(s.broker))
  11. const expected = {
  12. cmd: 'puback',
  13. messageId: 42,
  14. qos: 0,
  15. dup: false,
  16. length: 2,
  17. retain: false
  18. }
  19. s.inStream.write({
  20. cmd: 'publish',
  21. topic: 'hello',
  22. payload: 'world',
  23. qos: 1,
  24. messageId: 42
  25. })
  26. s.outStream.on('data', function (packet) {
  27. t.same(packet, expected, 'packet must match')
  28. })
  29. })
  30. test('publish QoS 1 throws error', function (t) {
  31. t.plan(1)
  32. const s = connect(setup())
  33. t.teardown(s.broker.close.bind(s.broker))
  34. s.broker.persistence.subscriptionsByTopic = function (packet, done) {
  35. return done(new Error('Throws error'))
  36. }
  37. s.inStream.write({
  38. cmd: 'publish',
  39. topic: 'hello',
  40. payload: 'world',
  41. qos: 1,
  42. messageId: 42
  43. })
  44. s.broker.on('error', function (err) {
  45. t.equal('Throws error', err.message, 'Throws error')
  46. })
  47. })
  48. test('publish QoS 1 throws error on write', function (t) {
  49. t.plan(1)
  50. const s = connect(setup())
  51. t.teardown(s.broker.close.bind(s.broker))
  52. s.broker.on('client', function (client) {
  53. client.connected = false
  54. client.connecting = false
  55. s.inStream.write({
  56. cmd: 'publish',
  57. topic: 'hello',
  58. payload: 'world',
  59. qos: 1,
  60. messageId: 42
  61. })
  62. })
  63. s.broker.on('clientError', function (client, err) {
  64. t.equal(err.message, 'connection closed', 'throws error')
  65. })
  66. })
  67. test('publish QoS 1 and check offline queue', function (t) {
  68. t.plan(13)
  69. const broker = aedes()
  70. t.teardown(broker.close.bind(broker))
  71. const publisher = connect(setup(broker), { clean: false })
  72. const subscriberClient = {
  73. id: 'abcde'
  74. }
  75. const subscriber = connect(setup(broker), { clean: false, clientId: subscriberClient.id })
  76. const expected = {
  77. cmd: 'publish',
  78. topic: 'hello',
  79. qos: 1,
  80. dup: false,
  81. retain: false
  82. }
  83. const expectedAck = {
  84. cmd: 'puback',
  85. retain: false,
  86. qos: 0,
  87. dup: false,
  88. length: 2,
  89. messageId: 10
  90. }
  91. const sent = {
  92. cmd: 'publish',
  93. topic: 'hello',
  94. payload: 'world',
  95. qos: 1,
  96. messageId: 10,
  97. retain: false,
  98. dup: false
  99. }
  100. const queue = []
  101. subscribe(t, subscriber, 'hello', 1, function () {
  102. publisher.outStream.on('data', function (packet) {
  103. t.same(packet, expectedAck, 'ack packet must patch')
  104. })
  105. subscriber.outStream.on('data', function (packet) {
  106. queue.push(packet)
  107. delete packet.payload
  108. delete packet.length
  109. t.not(packet.messageId, undefined, 'messageId is assigned a value')
  110. t.not(packet.messageId, 10, 'messageId should be unique')
  111. expected.messageId = packet.messageId
  112. t.same(packet, expected, 'publish packet must patch')
  113. if (queue.length === 2) {
  114. setImmediate(() => {
  115. for (let i = 0; i < queue.length; i++) {
  116. broker.persistence.outgoingClearMessageId(subscriberClient, queue[i], function (_, origPacket) {
  117. if (origPacket) {
  118. delete origPacket.brokerId
  119. delete origPacket.brokerCounter
  120. delete origPacket.payload
  121. delete origPacket.messageId
  122. delete sent.payload
  123. delete sent.messageId
  124. t.same(origPacket, sent, 'origPacket must match')
  125. }
  126. })
  127. }
  128. })
  129. }
  130. })
  131. publisher.inStream.write(sent)
  132. sent.payload = 'world2world'
  133. publisher.inStream.write(sent)
  134. })
  135. })
  136. test('publish QoS 1 and empty offline queue', function (t) {
  137. t.plan(13)
  138. const broker = aedes()
  139. t.teardown(broker.close.bind(broker))
  140. const publisher = connect(setup(broker), { clean: false })
  141. const subscriberClient = {
  142. id: 'abcde'
  143. }
  144. const subscriber = connect(setup(broker), { clean: false, clientId: subscriberClient.id })
  145. const expected = {
  146. cmd: 'publish',
  147. topic: 'hello',
  148. qos: 1,
  149. dup: false,
  150. retain: false
  151. }
  152. const expectedAck = {
  153. cmd: 'puback',
  154. retain: false,
  155. qos: 0,
  156. dup: false,
  157. length: 2,
  158. messageId: 10
  159. }
  160. const sent = {
  161. cmd: 'publish',
  162. topic: 'hello',
  163. payload: 'world',
  164. qos: 1,
  165. messageId: 10,
  166. retain: false,
  167. dup: false
  168. }
  169. const queue = []
  170. subscribe(t, subscriber, 'hello', 1, function () {
  171. publisher.outStream.on('data', function (packet) {
  172. t.same(packet, expectedAck, 'ack packet must patch')
  173. })
  174. subscriber.outStream.on('data', function (packet) {
  175. queue.push(packet)
  176. delete packet.payload
  177. delete packet.length
  178. t.not(packet.messageId, undefined, 'messageId is assigned a value')
  179. t.not(packet.messageId, 10, 'messageId should be unique')
  180. expected.messageId = packet.messageId
  181. t.same(packet, expected, 'publish packet must patch')
  182. if (queue.length === 2) {
  183. setImmediate(() => {
  184. broker.clients[subscriberClient.id].emptyOutgoingQueue(function () {
  185. for (let i = 0; i < queue.length; i++) {
  186. broker.persistence.outgoingClearMessageId(subscriberClient, queue[i], function (_, origPacket) {
  187. t.equal(!!origPacket, false, 'Packet has been removed')
  188. })
  189. }
  190. })
  191. })
  192. }
  193. })
  194. publisher.inStream.write(sent)
  195. sent.payload = 'world2world'
  196. publisher.inStream.write(sent)
  197. })
  198. })
  199. test('subscribe QoS 1', function (t) {
  200. t.plan(5)
  201. const broker = aedes()
  202. t.teardown(broker.close.bind(broker))
  203. const publisher = connect(setup(broker))
  204. const subscriber = connect(setup(broker))
  205. const expected = {
  206. cmd: 'publish',
  207. topic: 'hello',
  208. payload: Buffer.from('world'),
  209. qos: 1,
  210. dup: false,
  211. length: 14,
  212. retain: false
  213. }
  214. subscribe(t, subscriber, 'hello', 1, function () {
  215. subscriber.outStream.once('data', function (packet) {
  216. subscriber.inStream.write({
  217. cmd: 'puback',
  218. messageId: packet.messageId
  219. })
  220. t.not(packet.messageId, 42, 'messageId must differ')
  221. delete packet.messageId
  222. t.same(packet, expected, 'packet must match')
  223. })
  224. publisher.inStream.write({
  225. cmd: 'publish',
  226. topic: 'hello',
  227. payload: 'world',
  228. qos: 1,
  229. messageId: 42
  230. })
  231. })
  232. })
  233. test('subscribe QoS 0, but publish QoS 1', function (t) {
  234. t.plan(4)
  235. const broker = aedes()
  236. t.teardown(broker.close.bind(broker))
  237. const publisher = connect(setup(broker))
  238. const subscriber = connect(setup(broker))
  239. const expected = {
  240. cmd: 'publish',
  241. topic: 'hello',
  242. payload: Buffer.from('world'),
  243. qos: 0,
  244. dup: false,
  245. length: 12,
  246. retain: false
  247. }
  248. subscribe(t, subscriber, 'hello', 0, function () {
  249. subscriber.outStream.once('data', function (packet) {
  250. t.same(packet, expected, 'packet must match')
  251. })
  252. publisher.inStream.write({
  253. cmd: 'publish',
  254. topic: 'hello',
  255. payload: 'world',
  256. qos: 1,
  257. messageId: 42
  258. })
  259. })
  260. })
  261. test('restore QoS 1 subscriptions not clean', function (t) {
  262. t.plan(7)
  263. const broker = aedes()
  264. t.teardown(broker.close.bind(broker))
  265. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  266. const expected = {
  267. cmd: 'publish',
  268. topic: 'hello',
  269. payload: Buffer.from('world'),
  270. qos: 1,
  271. dup: false,
  272. length: 14,
  273. retain: false
  274. }
  275. subscribe(t, subscriber, 'hello', 1, function () {
  276. subscriber.inStream.end()
  277. const publisher = connect(setup(broker))
  278. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
  279. t.equal(connect.sessionPresent, true, 'session present is set to true')
  280. publisher.inStream.write({
  281. cmd: 'publish',
  282. topic: 'hello',
  283. payload: 'world',
  284. qos: 1,
  285. messageId: 42
  286. })
  287. })
  288. publisher.outStream.once('data', function (packet) {
  289. t.equal(packet.cmd, 'puback')
  290. })
  291. subscriber.outStream.once('data', function (packet) {
  292. subscriber.inStream.write({
  293. cmd: 'puback',
  294. messageId: packet.messageId
  295. })
  296. t.not(packet.messageId, 42, 'messageId must differ')
  297. delete packet.messageId
  298. t.same(packet, expected, 'packet must match')
  299. })
  300. })
  301. })
  302. test('restore multiple QoS 1 subscriptions not clean w/ authorizeSubscribe', function (t) {
  303. t.plan(11)
  304. const broker = aedes()
  305. t.teardown(broker.close.bind(broker))
  306. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  307. const expected = {
  308. cmd: 'publish',
  309. topic: 'foo',
  310. payload: Buffer.from('bar'),
  311. qos: 1,
  312. dup: false,
  313. length: 10,
  314. retain: false
  315. }
  316. const publisher = connect(setup(broker))
  317. subscribe(t, subscriber, 'hello', 1, function () {
  318. subscribe(t, subscriber, 'foo', 1, function () {
  319. subscriber.inStream.end()
  320. broker.authorizeSubscribe = function (client, sub, done) {
  321. done(null, sub.topic === 'hello' ? 123 : sub)
  322. }
  323. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
  324. t.equal(connect.sessionPresent, true, 'session present is set to true')
  325. publisher.inStream.write({
  326. cmd: 'publish',
  327. topic: 'hello',
  328. payload: 'world',
  329. qos: 1,
  330. messageId: 42
  331. })
  332. publisher.inStream.write({
  333. cmd: 'publish',
  334. topic: 'foo',
  335. payload: 'bar',
  336. qos: 1,
  337. messageId: 48
  338. })
  339. })
  340. publisher.outStream.on('data', function (packet) {
  341. t.equal(packet.cmd, 'puback')
  342. })
  343. subscriber.outStream.on('data', function (packet) {
  344. subscriber.inStream.write({
  345. cmd: 'puback',
  346. messageId: packet.messageId
  347. })
  348. t.not(packet.messageId, 48, 'messageId must differ')
  349. delete packet.messageId
  350. t.same(packet, expected, 'packet must match')
  351. })
  352. })
  353. })
  354. })
  355. test('remove stored subscriptions if connected with clean=true', function (t) {
  356. t.plan(5)
  357. const broker = aedes()
  358. t.teardown(broker.close.bind(broker))
  359. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  360. subscribe(t, subscriber, 'hello', 1, function () {
  361. subscriber.inStream.end()
  362. const publisher = connect(setup(broker))
  363. subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }, function (packet) {
  364. t.equal(packet.sessionPresent, false, 'session present is set to false')
  365. publisher.inStream.write({
  366. cmd: 'publish',
  367. topic: 'hello',
  368. payload: 'world',
  369. qos: 1,
  370. messageId: 42
  371. })
  372. subscriber.inStream.end()
  373. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
  374. t.equal(connect.sessionPresent, false, 'session present is set to false')
  375. publisher.inStream.write({
  376. cmd: 'publish',
  377. topic: 'hello',
  378. payload: 'world',
  379. qos: 1,
  380. messageId: 43
  381. })
  382. })
  383. subscriber.outStream.once('data', function (packet) {
  384. t.fail('publish received')
  385. })
  386. })
  387. subscriber.outStream.once('data', function (packet) {
  388. t.fail('publish received')
  389. })
  390. })
  391. })
  392. test('resend publish on non-clean reconnect QoS 1', function (t) {
  393. t.plan(8)
  394. const broker = aedes()
  395. t.teardown(broker.close.bind(broker))
  396. const opts = { clean: false, clientId: 'abcde' }
  397. let subscriber = connect(setup(broker), opts)
  398. const subscriberClient = {
  399. id: opts.clientId
  400. }
  401. const expected = {
  402. cmd: 'publish',
  403. topic: 'hello',
  404. payload: Buffer.from('world'),
  405. qos: 1,
  406. dup: false,
  407. length: 14,
  408. retain: false
  409. }
  410. subscribe(t, subscriber, 'hello', 1, function () {
  411. subscriber.inStream.end()
  412. const publisher = connect(setup(broker))
  413. publisher.inStream.write({
  414. cmd: 'publish',
  415. topic: 'hello',
  416. payload: 'world',
  417. qos: 1,
  418. messageId: 42
  419. })
  420. publisher.inStream.write({
  421. cmd: 'publish',
  422. topic: 'hello',
  423. payload: 'world world',
  424. qos: 1,
  425. messageId: 42
  426. })
  427. publisher.outStream.once('data', function (packet) {
  428. t.equal(packet.cmd, 'puback')
  429. subscriber = connect(setup(broker), opts)
  430. subscriber.outStream.once('data', function (packet) {
  431. subscriber.inStream.write({
  432. cmd: 'puback',
  433. messageId: packet.messageId
  434. })
  435. t.not(packet.messageId, 42, 'messageId must differ')
  436. delete packet.messageId
  437. t.same(packet, expected, 'packet must match')
  438. setImmediate(() => {
  439. const stream = broker.persistence.outgoingStream(subscriberClient)
  440. stream.pipe(concat(function (list) {
  441. t.equal(list.length, 1, 'should remain one item in queue')
  442. t.same(list[0].payload, Buffer.from('world world'), 'packet must match')
  443. }))
  444. })
  445. })
  446. })
  447. })
  448. })
  449. test('resend many publish on non-clean reconnect QoS 1', function (t) {
  450. t.plan(4)
  451. const broker = aedes()
  452. const clock = Faketimers.createClock()
  453. t.teardown(() => {
  454. broker.close.bind(broker)
  455. clock.reset.bind(clock)
  456. })
  457. const opts = { clean: false, clientId: 'abcde' }
  458. let subscriber = connect(setup(broker), opts)
  459. const publisher = connect(setup(broker))
  460. const { through } = require('../lib/utils')
  461. const total = through().writableHighWaterMark * 2
  462. let received = 0
  463. clock.setTimeout(() => {
  464. broker.close()
  465. t.equal(received, total)
  466. }, total)
  467. subscribe(t, subscriber, 'hello', 1, function () {
  468. subscriber.inStream.end()
  469. for (let sent = 0; sent < total; sent++) {
  470. publisher.inStream.write({
  471. cmd: 'publish',
  472. topic: 'hello',
  473. payload: 'message-' + sent,
  474. qos: 1,
  475. messageId: 42 + sent
  476. })
  477. }
  478. publisher.outStream.once('data', function (packet) {
  479. subscriber = connect(setup(broker), opts)
  480. subscriber.outStream.on('data', function (packet) {
  481. subscriber.inStream.write({
  482. cmd: 'puback',
  483. messageId: packet.messageId
  484. })
  485. received++
  486. clock.tick(1)
  487. })
  488. })
  489. })
  490. })
  491. test('do not resend QoS 1 packets at each reconnect', function (t) {
  492. t.plan(6)
  493. const broker = aedes()
  494. t.teardown(broker.close.bind(broker))
  495. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  496. const expected = {
  497. cmd: 'publish',
  498. topic: 'hello',
  499. payload: Buffer.from('world'),
  500. qos: 1,
  501. dup: false,
  502. length: 14,
  503. retain: false
  504. }
  505. subscribe(t, subscriber, 'hello', 1, function () {
  506. subscriber.inStream.end()
  507. const publisher = connect(setup(broker))
  508. publisher.inStream.write({
  509. cmd: 'publish',
  510. topic: 'hello',
  511. payload: 'world',
  512. qos: 1,
  513. messageId: 42
  514. })
  515. publisher.outStream.once('data', function (packet) {
  516. t.equal(packet.cmd, 'puback')
  517. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  518. subscriber.outStream.once('data', function (packet) {
  519. subscriber.inStream.end({
  520. cmd: 'puback',
  521. messageId: packet.messageId
  522. })
  523. t.not(packet.messageId, 42, 'messageId must differ')
  524. delete packet.messageId
  525. t.same(packet, expected, 'packet must match')
  526. const subscriber2 = connect(setup(broker), { clean: false, clientId: 'abcde' })
  527. subscriber2.outStream.once('data', function (packet) {
  528. t.fail('this should never happen')
  529. })
  530. })
  531. })
  532. })
  533. })
  534. test('do not resend QoS 1 packets if reconnect is clean', function (t) {
  535. t.plan(4)
  536. const broker = aedes()
  537. t.teardown(broker.close.bind(broker))
  538. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  539. subscribe(t, subscriber, 'hello', 1, function () {
  540. subscriber.inStream.end()
  541. const publisher = connect(setup(broker))
  542. publisher.inStream.write({
  543. cmd: 'publish',
  544. topic: 'hello',
  545. payload: 'world',
  546. qos: 1,
  547. messageId: 42
  548. })
  549. publisher.outStream.once('data', function (packet) {
  550. t.equal(packet.cmd, 'puback')
  551. subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' })
  552. subscriber.outStream.once('data', function (packet) {
  553. t.fail('this should never happen')
  554. })
  555. })
  556. })
  557. })
  558. test('do not resend QoS 1 packets at reconnect if puback was received', function (t) {
  559. t.plan(5)
  560. const broker = aedes()
  561. t.teardown(broker.close.bind(broker))
  562. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  563. const expected = {
  564. cmd: 'publish',
  565. topic: 'hello',
  566. payload: Buffer.from('world'),
  567. qos: 1,
  568. dup: false,
  569. length: 14,
  570. retain: false
  571. }
  572. subscribe(t, subscriber, 'hello', 1, function () {
  573. const publisher = connect(setup(broker))
  574. publisher.inStream.write({
  575. cmd: 'publish',
  576. topic: 'hello',
  577. payload: 'world',
  578. qos: 1,
  579. messageId: 42
  580. })
  581. publisher.outStream.once('data', function (packet) {
  582. t.equal(packet.cmd, 'puback')
  583. })
  584. subscriber.outStream.once('data', function (packet) {
  585. subscriber.inStream.end({
  586. cmd: 'puback',
  587. messageId: packet.messageId
  588. })
  589. delete packet.messageId
  590. t.same(packet, expected, 'packet must match')
  591. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  592. subscriber.outStream.once('data', function (packet) {
  593. t.fail('this should never happen')
  594. })
  595. })
  596. })
  597. })
  598. test('remove stored subscriptions after unsubscribe', function (t) {
  599. t.plan(5)
  600. const broker = aedes()
  601. t.teardown(broker.close.bind(broker))
  602. let subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
  603. subscribe(t, subscriber, 'hello', 1, function () {
  604. subscriber.inStream.write({
  605. cmd: 'unsubscribe',
  606. messageId: 43,
  607. unsubscriptions: ['hello']
  608. })
  609. subscriber.outStream.once('data', function (packet) {
  610. t.same(packet, {
  611. cmd: 'unsuback',
  612. messageId: 43,
  613. dup: false,
  614. length: 2,
  615. qos: 0,
  616. retain: false
  617. }, 'packet matches')
  618. subscriber.inStream.end()
  619. const publisher = connect(setup(broker))
  620. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (packet) {
  621. t.equal(packet.sessionPresent, false, 'session present is set to false')
  622. publisher.inStream.write({
  623. cmd: 'publish',
  624. topic: 'hello',
  625. payload: 'world',
  626. qos: 1,
  627. messageId: 42
  628. })
  629. publisher.inStream.write({
  630. cmd: 'publish',
  631. topic: 'hello',
  632. payload: 'world',
  633. qos: 1,
  634. messageId: 43
  635. }, function () {
  636. subscriber.inStream.end()
  637. })
  638. subscriber.outStream.once('data', function (packet) {
  639. t.fail('publish received')
  640. })
  641. })
  642. subscriber.outStream.once('data', function (packet) {
  643. t.fail('publish received')
  644. })
  645. })
  646. })
  647. })
  648. test('upgrade a QoS 0 subscription to QoS 1', function (t) {
  649. t.plan(8)
  650. const s = connect(setup())
  651. t.teardown(s.broker.close.bind(s.broker))
  652. const expected = {
  653. cmd: 'publish',
  654. topic: 'hello',
  655. payload: Buffer.from('world'),
  656. qos: 1,
  657. length: 14,
  658. retain: false,
  659. dup: false
  660. }
  661. subscribe(t, s, 'hello', 0, function () {
  662. subscribe(t, s, 'hello', 1, function () {
  663. s.outStream.once('data', function (packet) {
  664. t.ok(packet.messageId, 'has messageId')
  665. delete packet.messageId
  666. t.same(packet, expected, 'packet matches')
  667. })
  668. s.broker.publish({
  669. cmd: 'publish',
  670. topic: 'hello',
  671. payload: 'world',
  672. qos: 1
  673. })
  674. })
  675. })
  676. })
  677. test('downgrade QoS 0 publish on QoS 1 subsciption', function (t) {
  678. t.plan(4)
  679. const s = connect(setup())
  680. t.teardown(s.broker.close.bind(s.broker))
  681. const expected = {
  682. cmd: 'publish',
  683. topic: 'hello',
  684. payload: Buffer.from('world'),
  685. qos: 0,
  686. length: 12,
  687. retain: false,
  688. dup: false
  689. }
  690. subscribe(t, s, 'hello', 1, function () {
  691. s.outStream.once('data', function (packet) {
  692. t.same(packet, expected, 'packet matches')
  693. })
  694. s.broker.publish({
  695. cmd: 'publish',
  696. topic: 'hello',
  697. payload: 'world',
  698. qos: 0
  699. })
  700. })
  701. })
  702. test('subscribe and publish QoS 1 in parallel', function (t) {
  703. t.plan(5)
  704. const broker = aedes()
  705. t.teardown(broker.close.bind(broker))
  706. const s = connect(setup(broker))
  707. const expected = {
  708. cmd: 'publish',
  709. topic: 'hello',
  710. payload: Buffer.from('world'),
  711. qos: 1,
  712. dup: false,
  713. length: 14,
  714. retain: false
  715. }
  716. broker.on('clientError', function (client, err) {
  717. console.log(err.stack)
  718. // t.fail('no client error')
  719. })
  720. s.outStream.once('data', function (packet) {
  721. t.equal(packet.cmd, 'puback')
  722. t.equal(packet.messageId, 42, 'messageId must match')
  723. s.outStream.on('data', function (packet) {
  724. if (packet.cmd === 'suback') {
  725. t.same(packet.granted, [1])
  726. t.equal(packet.messageId, 24)
  727. }
  728. if (packet.cmd === 'publish') {
  729. s.inStream.write({
  730. cmd: 'puback',
  731. messageId: packet.messageId
  732. })
  733. delete packet.messageId
  734. t.same(packet, expected, 'packet must match')
  735. }
  736. })
  737. })
  738. s.inStream.write({
  739. cmd: 'subscribe',
  740. messageId: 24,
  741. subscriptions: [{
  742. topic: 'hello',
  743. qos: 1
  744. }]
  745. })
  746. s.inStream.write({
  747. cmd: 'publish',
  748. topic: 'hello',
  749. payload: 'world',
  750. qos: 1,
  751. messageId: 42
  752. })
  753. })