client-pub-sub.js 22 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. 'use strict'
  2. const { test } = require('tap')
  3. const { setup, connect, subscribe, noError } = require('./helper')
  4. const aedes = require('../')
  5. test('publish direct to a single client QoS 0', function (t) {
  6. t.plan(2)
  7. const broker = aedes()
  8. t.teardown(broker.close.bind(broker))
  9. const expected = {
  10. cmd: 'publish',
  11. topic: 'hello',
  12. payload: Buffer.from('world'),
  13. dup: false,
  14. length: 12,
  15. qos: 0,
  16. retain: false
  17. }
  18. broker.on('client', function (client) {
  19. client.publish({
  20. topic: 'hello',
  21. payload: Buffer.from('world'),
  22. qos: 0
  23. }, function (err) {
  24. t.error(err, 'no error')
  25. })
  26. })
  27. const s = connect(setup(broker))
  28. s.outStream.once('data', function (packet) {
  29. t.same(packet, expected, 'packet matches')
  30. })
  31. })
  32. test('publish direct to a single client throws error', function (t) {
  33. t.plan(1)
  34. const broker = aedes()
  35. t.teardown(broker.close.bind(broker))
  36. broker.persistence.outgoingEnqueue = function (sub, packet, done) {
  37. done(new Error('Throws error'))
  38. }
  39. broker.on('client', function (client) {
  40. client.publish({
  41. topic: 'hello',
  42. payload: Buffer.from('world'),
  43. qos: 1,
  44. retain: false
  45. }, function (err) {
  46. t.pass('Throws error', err.message, 'throws error')
  47. })
  48. })
  49. connect(setup(broker), { clean: false })
  50. })
  51. test('publish direct to a single client throws error 2', function (t) {
  52. t.plan(1)
  53. const broker = aedes()
  54. t.teardown(broker.close.bind(broker))
  55. broker.persistence.outgoingUpdate = function (client, packet, done) {
  56. done(new Error('Throws error'), client, packet)
  57. }
  58. broker.on('client', function (client) {
  59. client.publish({
  60. topic: 'hello',
  61. payload: Buffer.from('world'),
  62. qos: 1,
  63. retain: false
  64. }, () => {})
  65. client.once('error', function (err) {
  66. t.pass('Throws error', err.message, 'throws error')
  67. })
  68. })
  69. connect(setup(broker), { clean: false })
  70. })
  71. test('publish direct to a single client QoS 1', function (t) {
  72. t.plan(2)
  73. const broker = aedes()
  74. t.teardown(broker.close.bind(broker))
  75. const expected = {
  76. cmd: 'publish',
  77. topic: 'hello',
  78. payload: Buffer.from('world'),
  79. dup: false,
  80. length: 14,
  81. qos: 1,
  82. retain: false
  83. }
  84. broker.on('client', function (client) {
  85. client.publish({
  86. topic: 'hello',
  87. payload: Buffer.from('world'),
  88. qos: 1
  89. }, function (err) {
  90. t.error(err, 'no error')
  91. })
  92. })
  93. const s = connect(setup(broker))
  94. s.outStream.once('data', function (packet) {
  95. expected.messageId = packet.messageId
  96. t.same(packet, expected, 'packet matches')
  97. s.inStream.write({
  98. cmd: 'puback',
  99. messageId: packet.messageId
  100. })
  101. })
  102. })
  103. test('publish QoS 2 throws error in pubrel', function (t) {
  104. t.plan(2)
  105. const broker = aedes()
  106. t.teardown(broker.close.bind(broker))
  107. const s = connect(setup(broker))
  108. broker.on('clientError', function (c, err) {
  109. t.pass('throws error')
  110. })
  111. s.outStream.on('data', function (packet) {
  112. if (packet.cmd === 'publish') {
  113. s.inStream.write({
  114. cmd: 'pubrec',
  115. messageId: packet.messageId
  116. })
  117. s.broker.persistence.outgoingUpdate = function (client, pubrel, cb) {
  118. cb(new Error('error'))
  119. }
  120. }
  121. })
  122. broker.on('clientReady', function (client) {
  123. client.publish({
  124. topic: 'hello',
  125. payload: Buffer.from('world'),
  126. qos: 2
  127. }, function (err) {
  128. t.error(err, 'no error')
  129. })
  130. })
  131. })
  132. test('publish direct to a single client QoS 2', function (t) {
  133. t.plan(3)
  134. const broker = aedes()
  135. t.teardown(broker.close.bind(broker))
  136. let publishCount = 0
  137. let nonPublishCount = 0
  138. broker.on('clientReady', function (client) {
  139. client.publish({
  140. topic: 'hello',
  141. payload: Buffer.from('world'),
  142. qos: 2
  143. }, function (err) {
  144. t.error(err, 'no error')
  145. })
  146. client.on('error', function (err) {
  147. t.error(err)
  148. })
  149. })
  150. const s = connect(setup(broker))
  151. s.inStream.on('close', () => {
  152. t.equal(publishCount, 1)
  153. t.equal(nonPublishCount, 1)
  154. })
  155. s.outStream.on('data', function (packet) {
  156. if (packet.cmd === 'publish') {
  157. publishCount++
  158. s.inStream.write({
  159. cmd: 'pubrec',
  160. messageId: packet.messageId
  161. })
  162. } else {
  163. nonPublishCount++
  164. s.inStream.write({
  165. cmd: 'pubcomp',
  166. messageId: packet.messageId
  167. })
  168. s.inStream.destroy()
  169. }
  170. })
  171. })
  172. test('emit a `ack` event on PUBACK for QoS 1 [clean=false]', function (t) {
  173. t.plan(3)
  174. const broker = aedes()
  175. t.teardown(broker.close.bind(broker))
  176. const expected = {
  177. cmd: 'publish',
  178. topic: 'hello',
  179. payload: Buffer.from('world'),
  180. qos: 1,
  181. retain: false,
  182. dup: false
  183. }
  184. broker.on('clientReady', function (client) {
  185. client.publish({
  186. topic: 'hello',
  187. payload: Buffer.from('world'),
  188. qos: 1
  189. }, function (err) {
  190. t.error(err, 'no error')
  191. })
  192. })
  193. broker.once('ack', function (packet, client) {
  194. expected.brokerId = packet.brokerId
  195. expected.brokerCounter = packet.brokerCounter
  196. expected.messageId = packet.messageId
  197. t.same(packet, expected, 'ack packet is origianl packet')
  198. t.pass('got the ack event')
  199. })
  200. const s = connect(setup(broker), { clean: false })
  201. s.outStream.once('data', function (packet) {
  202. s.inStream.write({
  203. cmd: 'puback',
  204. messageId: packet.messageId
  205. })
  206. })
  207. })
  208. test('emit a `ack` event on PUBACK for QoS 1 [clean=true]', function (t) {
  209. t.plan(3)
  210. const broker = aedes()
  211. t.teardown(broker.close.bind(broker))
  212. broker.on('clientReady', function (client) {
  213. client.publish({
  214. topic: 'hello',
  215. payload: Buffer.from('world'),
  216. qos: 1
  217. }, function (err) {
  218. t.error(err, 'no error')
  219. })
  220. })
  221. broker.once('ack', function (packet, client) {
  222. t.equal(packet, undefined, 'ack packet is undefined')
  223. t.pass('got the ack event')
  224. })
  225. const s = connect(setup(broker), { clean: true })
  226. s.outStream.once('data', function (packet) {
  227. s.inStream.write({
  228. cmd: 'puback',
  229. messageId: packet.messageId
  230. })
  231. })
  232. })
  233. test('emit a `ack` event on PUBCOMP for QoS 2 [clean=false]', function (t) {
  234. t.plan(5)
  235. const broker = aedes()
  236. t.teardown(broker.close.bind(broker))
  237. let messageId
  238. let clientId
  239. broker.on('clientReady', function (client) {
  240. clientId = client.id
  241. client.publish({
  242. topic: 'hello',
  243. payload: Buffer.from('world'),
  244. qos: 2
  245. }, function (err) {
  246. t.error(err, 'no error')
  247. })
  248. })
  249. broker.once('ack', function (packet, client) {
  250. t.equal(client.id, clientId)
  251. t.equal(packet.messageId, messageId)
  252. t.equal(packet.cmd, 'pubrel', 'ack packet is purel')
  253. t.pass('got the ack event')
  254. })
  255. const s = connect(setup(broker), { clean: false })
  256. s.outStream.on('data', function (packet) {
  257. if (packet.cmd === 'publish') {
  258. s.inStream.write({
  259. cmd: 'pubrec',
  260. messageId: packet.messageId
  261. })
  262. } else {
  263. messageId = packet.messageId
  264. s.inStream.write({
  265. cmd: 'pubcomp',
  266. messageId: packet.messageId
  267. })
  268. }
  269. })
  270. })
  271. test('emit a `ack` event on PUBCOMP for QoS 2 [clean=true]', function (t) {
  272. t.plan(3)
  273. const broker = aedes()
  274. t.teardown(broker.close.bind(broker))
  275. broker.on('clientReady', function (client) {
  276. client.publish({
  277. topic: 'hello',
  278. payload: Buffer.from('world'),
  279. qos: 2
  280. }, function (err) {
  281. t.error(err, 'no error')
  282. })
  283. })
  284. broker.once('ack', function (packet, client) {
  285. t.equal(packet, undefined, 'ack packet is undefined')
  286. t.pass('got the ack event')
  287. })
  288. const s = connect(setup(broker), { clean: true })
  289. s.outStream.on('data', function (packet) {
  290. if (packet.cmd === 'publish') {
  291. s.inStream.write({
  292. cmd: 'pubrec',
  293. messageId: packet.messageId
  294. })
  295. } else {
  296. s.inStream.write({
  297. cmd: 'pubcomp',
  298. messageId: packet.messageId
  299. })
  300. }
  301. })
  302. })
  303. test('offline message support for direct publish', function (t) {
  304. t.plan(2)
  305. const broker = aedes()
  306. t.teardown(broker.close.bind(broker))
  307. const expected = {
  308. cmd: 'publish',
  309. topic: 'hello',
  310. payload: Buffer.from('world'),
  311. dup: false,
  312. length: 14,
  313. qos: 1,
  314. retain: false
  315. }
  316. const opts = {
  317. clean: false,
  318. clientId: 'abcde'
  319. }
  320. broker.once('client', function (client) {
  321. client.publish({
  322. topic: 'hello',
  323. payload: Buffer.from('world'),
  324. qos: 1
  325. }, function (err) {
  326. t.error(err, 'no error')
  327. })
  328. })
  329. let s = connect(setup(broker), opts)
  330. s.outStream.once('data', function (packet) {
  331. s = connect(setup(broker), opts)
  332. s.outStream.once('data', function (packet) {
  333. s = connect(setup(broker), opts)
  334. s.inStream.write({
  335. cmd: 'puback',
  336. messageId: packet.messageId
  337. })
  338. delete packet.messageId
  339. t.same(packet, expected, 'packet must match')
  340. })
  341. })
  342. })
  343. test('subscribe a client programmatically', function (t) {
  344. t.plan(3)
  345. const broker = aedes()
  346. t.teardown(broker.close.bind(broker))
  347. const expected = {
  348. cmd: 'publish',
  349. topic: 'hello',
  350. payload: Buffer.from('world'),
  351. dup: false,
  352. length: 12,
  353. qos: 0,
  354. retain: false
  355. }
  356. broker.on('client', function (client) {
  357. client.subscribe({
  358. topic: 'hello',
  359. qos: 0
  360. }, function (err) {
  361. t.error(err, 'no error')
  362. broker.publish({
  363. topic: 'hello',
  364. payload: Buffer.from('world'),
  365. qos: 0
  366. }, function (err) {
  367. t.error(err, 'no error')
  368. })
  369. })
  370. })
  371. const s = connect(setup(broker))
  372. s.outStream.once('data', function (packet) {
  373. t.same(packet, expected, 'packet matches')
  374. })
  375. })
  376. test('subscribe a client programmatically clears retain', function (t) {
  377. t.plan(3)
  378. const broker = aedes()
  379. t.teardown(broker.close.bind(broker))
  380. const expected = {
  381. cmd: 'publish',
  382. topic: 'hello',
  383. payload: Buffer.from('world'),
  384. dup: false,
  385. length: 12,
  386. qos: 0,
  387. retain: false
  388. }
  389. broker.on('client', function (client) {
  390. client.subscribe({
  391. topic: 'hello',
  392. qos: 0
  393. }, function (err) {
  394. t.error(err, 'no error')
  395. broker.publish({
  396. topic: 'hello',
  397. payload: Buffer.from('world'),
  398. qos: 0,
  399. retain: true
  400. }, function (err) {
  401. t.error(err, 'no error')
  402. })
  403. })
  404. })
  405. const s = connect(setup(broker))
  406. s.outStream.once('data', function (packet) {
  407. t.same(packet, expected, 'packet matches')
  408. })
  409. })
  410. test('subscribe a bridge programmatically keeps retain', function (t) {
  411. t.plan(3)
  412. const broker = aedes()
  413. t.teardown(broker.close.bind(broker))
  414. const expected = {
  415. cmd: 'publish',
  416. topic: 'hello',
  417. payload: Buffer.from('world'),
  418. dup: false,
  419. length: 12,
  420. qos: 0,
  421. retain: true
  422. }
  423. broker.on('client', function (client) {
  424. client.subscribe({
  425. topic: 'hello',
  426. qos: 0,
  427. rap: true
  428. }, function (err) {
  429. t.error(err, 'no error')
  430. broker.publish({
  431. topic: 'hello',
  432. payload: Buffer.from('world'),
  433. qos: 0,
  434. retain: true
  435. }, function (err) {
  436. t.error(err, 'no error')
  437. })
  438. })
  439. })
  440. const s = connect(setup(broker))
  441. s.outStream.once('data', function (packet) {
  442. t.same(packet, expected, 'packet matches')
  443. })
  444. })
  445. test('subscribe throws error when QoS > 0', function (t) {
  446. t.plan(3)
  447. const broker = aedes()
  448. t.teardown(broker.close.bind(broker))
  449. broker.on('clientReady', function (client) {
  450. client.subscribe({
  451. topic: 'hello',
  452. qos: 1
  453. }, function (err) {
  454. t.error(err, 'no error')
  455. // makes writeQos throw error
  456. client.connected = false
  457. client.connecting = false
  458. broker.publish({
  459. topic: 'hello',
  460. payload: Buffer.from('world'),
  461. qos: 1
  462. }, function (err) {
  463. t.error(err, 'no error')
  464. })
  465. })
  466. })
  467. broker.on('clientError', function (client, error) {
  468. t.equal(error.message, 'connection closed', 'should throw clientError')
  469. })
  470. connect(setup(broker))
  471. })
  472. test('subscribe a client programmatically - wildcard', function (t) {
  473. t.plan(3)
  474. const broker = aedes()
  475. t.teardown(broker.close.bind(broker))
  476. const expected = {
  477. cmd: 'publish',
  478. topic: 'hello/world/1',
  479. payload: Buffer.from('world'),
  480. dup: false,
  481. length: 20,
  482. qos: 0,
  483. retain: false
  484. }
  485. broker.on('clientReady', function (client) {
  486. client.subscribe({
  487. topic: '+/world/1',
  488. qos: 0
  489. }, function (err) {
  490. t.error(err, 'no error')
  491. broker.publish({
  492. topic: 'hello/world/1',
  493. payload: Buffer.from('world'),
  494. qos: 0
  495. }, function (err) {
  496. t.error(err, 'no error')
  497. })
  498. })
  499. })
  500. const s = connect(setup(broker))
  501. s.outStream.once('data', function (packet) {
  502. t.same(packet, expected, 'packet matches')
  503. })
  504. })
  505. test('unsubscribe a client', function (t) {
  506. t.plan(2)
  507. const broker = aedes()
  508. t.teardown(broker.close.bind(broker))
  509. broker.on('client', function (client) {
  510. client.subscribe({
  511. topic: 'hello',
  512. qos: 0
  513. }, function (err) {
  514. t.error(err, 'no error')
  515. client.unsubscribe([{
  516. topic: 'hello',
  517. qos: 0
  518. }], function (err) {
  519. t.error(err, 'no error')
  520. })
  521. })
  522. })
  523. connect(setup(broker))
  524. })
  525. test('unsubscribe should not call removeSubscriptions when [clean=true]', function (t) {
  526. t.plan(2)
  527. const broker = aedes()
  528. t.teardown(broker.close.bind(broker))
  529. broker.persistence.removeSubscriptions = function (client, subs, cb) {
  530. cb(Error('remove subscription is called'))
  531. }
  532. broker.on('client', function (client) {
  533. client.subscribe({
  534. topic: 'hello',
  535. qos: 1
  536. }, function (err) {
  537. t.error(err, 'no error')
  538. client.unsubscribe({
  539. unsubscriptions: [{
  540. topic: 'hello',
  541. qos: 1
  542. }],
  543. messageId: 42
  544. }, function (err) {
  545. t.error(err, 'no error')
  546. })
  547. })
  548. })
  549. connect(setup(broker), { clean: true })
  550. })
  551. test('unsubscribe throws error', function (t) {
  552. t.plan(2)
  553. const broker = aedes()
  554. t.teardown(broker.close.bind(broker))
  555. broker.on('client', function (client) {
  556. client.subscribe({
  557. topic: 'hello',
  558. qos: 0
  559. }, function (err) {
  560. t.error(err, 'no error')
  561. broker.unsubscribe = function (topic, func, done) {
  562. done(new Error('error'))
  563. }
  564. client.unsubscribe({
  565. topic: 'hello',
  566. qos: 0
  567. }, function () {
  568. t.pass('throws error')
  569. })
  570. })
  571. })
  572. connect(setup(broker))
  573. })
  574. test('unsubscribe throws error 2', function (t) {
  575. t.plan(2)
  576. const broker = aedes()
  577. t.teardown(broker.close.bind(broker))
  578. broker.on('client', function (client) {
  579. client.subscribe({
  580. topic: 'hello',
  581. qos: 2
  582. }, function (err) {
  583. t.error(err, 'no error')
  584. broker.persistence.removeSubscriptions = function (client, unsubscriptions, done) {
  585. done(new Error('error'))
  586. }
  587. client.unsubscribe({
  588. unsubscriptions: [{
  589. topic: 'hello',
  590. qos: 2
  591. }],
  592. messageId: 42
  593. }, function () {
  594. t.pass('throws error')
  595. })
  596. })
  597. })
  598. connect(setup(broker))
  599. })
  600. test('subscribe a client programmatically multiple topics', function (t) {
  601. t.plan(3)
  602. const broker = aedes()
  603. t.teardown(broker.close.bind(broker))
  604. const expected = {
  605. cmd: 'publish',
  606. topic: 'hello',
  607. payload: Buffer.from('world'),
  608. dup: false,
  609. length: 12,
  610. qos: 0,
  611. retain: false
  612. }
  613. broker.on('client', function (client) {
  614. client.subscribe([{
  615. topic: 'hello',
  616. qos: 0
  617. }, {
  618. topic: 'aaa',
  619. qos: 0
  620. }], function (err) {
  621. t.error(err, 'no error')
  622. broker.publish({
  623. topic: 'hello',
  624. payload: Buffer.from('world'),
  625. qos: 0
  626. }, function (err) {
  627. t.error(err, 'no error')
  628. })
  629. })
  630. })
  631. const s = connect(setup(broker))
  632. s.outStream.once('data', function (packet) {
  633. t.same(packet, expected, 'packet matches')
  634. })
  635. })
  636. test('subscribe a client programmatically with full packet', function (t) {
  637. t.plan(3)
  638. const broker = aedes()
  639. t.teardown(broker.close.bind(broker))
  640. const expected = {
  641. cmd: 'publish',
  642. topic: 'hello',
  643. payload: Buffer.from('world'),
  644. dup: false,
  645. length: 12,
  646. qos: 0,
  647. retain: false
  648. }
  649. broker.on('client', function (client) {
  650. client.subscribe({
  651. subscriptions: [{
  652. topic: 'hello',
  653. qos: 0
  654. }, {
  655. topic: 'aaa',
  656. qos: 0
  657. }]
  658. }, function (err) {
  659. t.error(err, 'no error')
  660. broker.publish({
  661. topic: 'hello',
  662. payload: Buffer.from('world'),
  663. qos: 0
  664. }, function (err) {
  665. t.error(err, 'no error')
  666. })
  667. })
  668. })
  669. const s = connect(setup(broker))
  670. s.outStream.once('data', function (packet) {
  671. t.same(packet, expected, 'packet matches')
  672. })
  673. })
  674. test('get message when client connects', function (t) {
  675. t.plan(2)
  676. const client1 = 'gav'
  677. const broker = aedes()
  678. t.teardown(broker.close.bind(broker))
  679. broker.on('client', function (client) {
  680. client.subscribe({
  681. subscriptions: [{
  682. topic: '$SYS/+/new/clients',
  683. qos: 0
  684. }]
  685. }, function (err) {
  686. t.error(err, 'no error')
  687. })
  688. })
  689. const s1 = connect(setup(broker), { clientId: client1 })
  690. s1.outStream.on('data', function (packet) {
  691. t.equal(client1, packet.payload.toString())
  692. })
  693. })
  694. test('get message when client disconnects', function (t) {
  695. t.plan(2)
  696. const client1 = 'gav'
  697. const client2 = 'friend'
  698. const broker = aedes()
  699. t.teardown(broker.close.bind(broker))
  700. broker.on('client', function (client) {
  701. if (client.id === client1) {
  702. client.subscribe({
  703. subscriptions: [{
  704. topic: '$SYS/+/disconnect/clients',
  705. qos: 0
  706. }]
  707. }, function (err) {
  708. t.error(err, 'no error')
  709. })
  710. } else {
  711. client.close()
  712. }
  713. })
  714. const s1 = connect(setup(broker), { clientId: client1 })
  715. connect(setup(broker), { clientId: client2 })
  716. s1.outStream.on('data', function (packet) {
  717. t.equal(client2, packet.payload.toString())
  718. })
  719. })
  720. test('should not receive a message on negated subscription', function (t) {
  721. t.plan(4)
  722. const broker = aedes()
  723. t.teardown(broker.close.bind(broker))
  724. broker.authorizeSubscribe = function (client, sub, callback) {
  725. callback(null, null)
  726. }
  727. broker.on('client', function (client) {
  728. broker.publish({
  729. topic: 'hello',
  730. payload: Buffer.from('world'),
  731. qos: 0,
  732. retain: true
  733. }, function (err) {
  734. t.error(err, 'no error')
  735. client.subscribe([{
  736. topic: 'hello',
  737. qos: 0
  738. },
  739. {
  740. topic: 'hello',
  741. qos: 0
  742. }], function (err) {
  743. t.error(err, 'no error')
  744. })
  745. })
  746. })
  747. broker.on('subscribe', function (subs) {
  748. t.pass(subs.length, 1, 'Should dedupe subs')
  749. t.pass(subs[0].qos, 128, 'Qos should be 128 (Fail)')
  750. })
  751. const s = connect(setup(broker))
  752. s.outStream.once('data', function (packet) {
  753. t.fail('Packet should not be received')
  754. })
  755. })
  756. test('programmatically add custom subscribe', function (t) {
  757. t.plan(6)
  758. const broker = aedes({ clientId: 'my-client-xyz-7' })
  759. t.teardown(broker.close.bind(broker))
  760. const s = connect(setup(broker), { clientId: 'my-client-xyz-7' })
  761. const expected = {
  762. cmd: 'publish',
  763. topic: 'hello',
  764. payload: Buffer.from('world'),
  765. qos: 0,
  766. retain: false,
  767. length: 12,
  768. dup: false
  769. }
  770. const deliverP = {
  771. cmd: 'publish',
  772. topic: 'hello',
  773. payload: Buffer.from('world'),
  774. qos: 0,
  775. retain: false,
  776. dup: false,
  777. clientId: 'my-client-xyz-7'
  778. }
  779. subscribe(t, s, 'hello', 0, function () {
  780. broker.subscribe('hello', deliver, function () {
  781. t.pass('subscribed')
  782. })
  783. s.outStream.on('data', function (packet) {
  784. t.same(packet, expected, 'packet matches')
  785. })
  786. s.inStream.write({
  787. cmd: 'publish',
  788. topic: 'hello',
  789. payload: 'world',
  790. qos: 0,
  791. messageId: 42
  792. })
  793. })
  794. function deliver (packet, cb) {
  795. deliverP.brokerId = s.broker.id
  796. deliverP.brokerCounter = s.broker.counter
  797. t.same(packet, deliverP, 'packet matches')
  798. cb()
  799. }
  800. })
  801. test('custom function in broker.subscribe', function (t) {
  802. t.plan(4)
  803. const broker = aedes()
  804. t.teardown(broker.close.bind(broker))
  805. const s = setup(broker)
  806. const expected = {
  807. cmd: 'publish',
  808. topic: 'hello',
  809. payload: Buffer.from('world'),
  810. qos: 1,
  811. retain: false,
  812. dup: false,
  813. messageId: undefined,
  814. clientId: 'my-client-xyz-6'
  815. }
  816. connect(s, { clientId: 'my-client-xyz-6' }, function () {
  817. broker.subscribe('hello', deliver, function () {
  818. t.pass('subscribed')
  819. })
  820. s.inStream.write({
  821. cmd: 'publish',
  822. topic: 'hello',
  823. payload: 'world',
  824. qos: 1,
  825. messageId: 42
  826. })
  827. })
  828. broker.on('publish', function (packet, client) {
  829. if (client) {
  830. t.equal(packet.topic, 'hello')
  831. t.equal(packet.messageId, 42)
  832. }
  833. })
  834. function deliver (packet, cb) {
  835. expected.brokerId = s.broker.id
  836. expected.brokerCounter = s.broker.counter
  837. t.same(packet, expected, 'packet matches')
  838. cb()
  839. }
  840. })
  841. test('custom function in broker.unsubscribe', function (t) {
  842. t.plan(3)
  843. const broker = aedes()
  844. t.teardown(broker.close.bind(broker))
  845. const s = noError(setup(broker))
  846. connect(s, {}, function () {
  847. broker.subscribe('hello', deliver, function () {
  848. t.pass('subscribed')
  849. broker.unsubscribe('hello', deliver, function () {
  850. t.pass('unsubscribe')
  851. s.inStream.write({
  852. cmd: 'publish',
  853. topic: 'hello',
  854. payload: 'word',
  855. qos: 1,
  856. messageId: 42
  857. })
  858. })
  859. })
  860. })
  861. broker.on('publish', function (packet, client) {
  862. if (client) {
  863. t.pass('publish')
  864. }
  865. })
  866. function deliver (packet, cb) {
  867. t.fail('should not be called')
  868. cb()
  869. }
  870. })