basic.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. 'use strict'
  2. const { test } = require('tap')
  3. const eos = require('end-of-stream')
  4. const { setup, connect, subscribe, subscribeMultiple, noError } = require('./helper')
  5. const aedes = require('../')
  6. const proxyquire = require('proxyquire')
  7. test('test aedes.createBroker', function (t) {
  8. t.plan(1)
  9. const broker = aedes.createBroker()
  10. t.teardown(broker.close.bind(broker))
  11. connect(setup(broker), {}, function () {
  12. t.pass('connected')
  13. })
  14. })
  15. test('publish QoS 0', function (t) {
  16. t.plan(2)
  17. const s = connect(setup(), { clientId: 'my-client-xyz-5' })
  18. t.teardown(s.broker.close.bind(s.broker))
  19. const expected = {
  20. cmd: 'publish',
  21. topic: 'hello',
  22. payload: Buffer.from('world'),
  23. qos: 0,
  24. retain: false,
  25. dup: false,
  26. clientId: 'my-client-xyz-5'
  27. }
  28. s.broker.mq.on('hello', function (packet, cb) {
  29. expected.brokerId = s.broker.id
  30. expected.brokerCounter = s.broker.counter
  31. t.equal(packet.messageId, undefined, 'MUST not contain a packet identifier in QoS 0')
  32. t.same(packet, expected, 'packet matches')
  33. cb()
  34. })
  35. s.inStream.write({
  36. cmd: 'publish',
  37. topic: 'hello',
  38. payload: 'world'
  39. })
  40. })
  41. test('messageId shoud reset to 1 if it reached 65535', function (t) {
  42. t.plan(7)
  43. const s = connect(setup())
  44. t.teardown(s.broker.close.bind(s.broker))
  45. const publishPacket = {
  46. cmd: 'publish',
  47. topic: 'hello',
  48. payload: 'world',
  49. qos: 1,
  50. messageId: 42
  51. }
  52. let count = 0
  53. s.broker.on('clientReady', function (client) {
  54. subscribe(t, s, 'hello', 1, function () {
  55. client._nextId = 65535
  56. s.outStream.on('data', function (packet) {
  57. if (packet.cmd === 'puback') {
  58. t.equal(packet.messageId, 42)
  59. }
  60. if (packet.cmd === 'publish') {
  61. t.equal(packet.messageId, count++ === 0 ? 65535 : 1)
  62. }
  63. })
  64. s.inStream.write(publishPacket)
  65. s.inStream.write(publishPacket)
  66. })
  67. })
  68. })
  69. test('publish empty topic throws error', function (t) {
  70. t.plan(1)
  71. const s = connect(setup())
  72. t.teardown(s.broker.close.bind(s.broker))
  73. s.inStream.write({
  74. cmd: 'publish',
  75. topic: '',
  76. payload: 'world'
  77. })
  78. s.broker.on('clientError', function (client, err) {
  79. t.pass('should emit error')
  80. })
  81. })
  82. test('publish to $SYS topic throws error', function (t) {
  83. t.plan(1)
  84. const s = connect(setup())
  85. t.teardown(s.broker.close.bind(s.broker))
  86. s.inStream.write({
  87. cmd: 'publish',
  88. topic: '$SYS/not/allowed',
  89. payload: 'world'
  90. })
  91. s.broker.on('clientError', function (client, err) {
  92. t.pass('should emit error')
  93. })
  94. })
  95. ;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) {
  96. test('subscribe a single topic in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) {
  97. t.plan(5)
  98. const s = connect(setup(), { clean: ele.clean })
  99. t.teardown(s.broker.close.bind(s.broker))
  100. const expected = {
  101. cmd: 'publish',
  102. topic: 'hello',
  103. payload: Buffer.from('world'),
  104. dup: false,
  105. length: 12,
  106. qos: 0,
  107. retain: false
  108. }
  109. const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }]
  110. subscribe(t, s, 'hello', ele.qos, function () {
  111. s.outStream.once('data', function (packet) {
  112. t.same(packet, expected, 'packet matches')
  113. })
  114. s.broker.persistence.subscriptionsByClient(s.client, function (_, subs) {
  115. t.same(subs, expectedSubs)
  116. })
  117. s.broker.publish({
  118. cmd: 'publish',
  119. topic: 'hello',
  120. payload: 'world'
  121. })
  122. })
  123. })
  124. })
  125. // Catch invalid packet writeToStream errors
  126. test('return write errors to callback', function (t) {
  127. t.plan(1)
  128. const write = proxyquire('../lib/write.js', {
  129. 'mqtt-packet': {
  130. writeToStream: () => {
  131. throw Error('error')
  132. }
  133. }
  134. })
  135. const client = {
  136. conn: {
  137. writable: true
  138. },
  139. connecting: true
  140. }
  141. write(client, {}, function (err) {
  142. t.equal(err.message, 'packet received not valid', 'should return the error to callback')
  143. })
  144. })
  145. ;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) {
  146. test('subscribe multipe topics in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) {
  147. t.plan(5)
  148. const s = connect(setup(), { clean: ele.clean })
  149. t.teardown(s.broker.close.bind(s.broker))
  150. const expected = {
  151. cmd: 'publish',
  152. topic: 'hello',
  153. payload: Buffer.from('world'),
  154. dup: false,
  155. length: 12,
  156. qos: 0,
  157. retain: false
  158. }
  159. const subs = [
  160. { topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined },
  161. { topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }
  162. ]
  163. const expectedSubs = ele.clean ? null : subs
  164. subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () {
  165. s.outStream.on('data', function (packet) {
  166. t.same(packet, expected, 'packet matches')
  167. })
  168. s.broker.persistence.subscriptionsByClient(s.client, function (_, saveSubs) {
  169. t.same(saveSubs, expectedSubs)
  170. })
  171. s.broker.publish({
  172. cmd: 'publish',
  173. topic: 'hello',
  174. payload: 'world'
  175. })
  176. })
  177. })
  178. })
  179. test('does not die badly on connection error', function (t) {
  180. t.plan(3)
  181. const s = connect(setup())
  182. t.teardown(s.broker.close.bind(s.broker))
  183. s.inStream.write({
  184. cmd: 'subscribe',
  185. messageId: 42,
  186. subscriptions: [{
  187. topic: 'hello',
  188. qos: 0
  189. }]
  190. })
  191. s.broker.on('clientError', function (client, err) {
  192. t.ok(client, 'client is passed')
  193. t.ok(err, 'err is passed')
  194. })
  195. s.outStream.on('data', function (packet) {
  196. s.conn.destroy()
  197. s.broker.publish({
  198. cmd: 'publish',
  199. topic: 'hello',
  200. payload: Buffer.from('world')
  201. }, function () {
  202. t.pass('calls the callback')
  203. })
  204. })
  205. })
  206. // Guarded in mqtt-packet
  207. test('subscribe should have messageId', function (t) {
  208. t.plan(1)
  209. const s = connect(setup())
  210. t.teardown(s.broker.close.bind(s.broker))
  211. s.inStream.write({
  212. cmd: 'subscribe',
  213. subscriptions: [{
  214. topic: 'hello',
  215. qos: 0
  216. }]
  217. })
  218. s.broker.on('connectionError', function (client, err) {
  219. t.ok(err.message, 'Invalid messageId')
  220. })
  221. })
  222. test('subscribe with messageId 0 should return suback', function (t) {
  223. t.plan(1)
  224. const s = connect(setup())
  225. t.teardown(s.broker.close.bind(s.broker))
  226. s.inStream.write({
  227. cmd: 'subscribe',
  228. subscriptions: [{
  229. topic: 'hello',
  230. qos: 0
  231. }],
  232. messageId: 0
  233. })
  234. s.outStream.once('data', function (packet) {
  235. t.same(packet, {
  236. cmd: 'suback',
  237. messageId: 0,
  238. dup: false,
  239. length: 3,
  240. qos: 0,
  241. retain: false,
  242. granted: [
  243. 0
  244. ]
  245. }, 'packet matches')
  246. })
  247. })
  248. test('unsubscribe', function (t) {
  249. t.plan(5)
  250. const s = noError(connect(setup()), t)
  251. t.teardown(s.broker.close.bind(s.broker))
  252. subscribe(t, s, 'hello', 0, function () {
  253. s.inStream.write({
  254. cmd: 'unsubscribe',
  255. messageId: 43,
  256. unsubscriptions: ['hello']
  257. })
  258. s.outStream.once('data', function (packet) {
  259. t.same(packet, {
  260. cmd: 'unsuback',
  261. messageId: 43,
  262. dup: false,
  263. length: 2,
  264. qos: 0,
  265. retain: false
  266. }, 'packet matches')
  267. s.outStream.on('data', function (packet) {
  268. t.fail('packet received')
  269. })
  270. s.broker.publish({
  271. cmd: 'publish',
  272. topic: 'hello',
  273. payload: 'world'
  274. }, function () {
  275. t.pass('publish finished')
  276. })
  277. })
  278. })
  279. })
  280. test('unsubscribe without subscribe', function (t) {
  281. t.plan(1)
  282. const s = noError(connect(setup()), t)
  283. t.teardown(s.broker.close.bind(s.broker))
  284. s.inStream.write({
  285. cmd: 'unsubscribe',
  286. messageId: 43,
  287. unsubscriptions: ['hello']
  288. })
  289. s.outStream.once('data', function (packet) {
  290. t.same(packet, {
  291. cmd: 'unsuback',
  292. messageId: 43,
  293. dup: false,
  294. length: 2,
  295. qos: 0,
  296. retain: false
  297. }, 'packet matches')
  298. })
  299. })
  300. test('unsubscribe on disconnect for a clean=true client', function (t) {
  301. t.plan(6)
  302. const opts = { clean: true }
  303. const s = connect(setup(), opts)
  304. t.teardown(s.broker.close.bind(s.broker))
  305. subscribe(t, s, 'hello', 0, function () {
  306. s.conn.destroy(null, function () {
  307. t.pass('closed streams')
  308. })
  309. s.outStream.on('data', function () {
  310. t.fail('should not receive any more messages')
  311. })
  312. s.broker.once('unsubscribe', function () {
  313. t.pass('should emit unsubscribe')
  314. })
  315. s.broker.publish({
  316. cmd: 'publish',
  317. topic: 'hello',
  318. payload: Buffer.from('world')
  319. }, function () {
  320. t.pass('calls the callback')
  321. })
  322. })
  323. })
  324. test('unsubscribe on disconnect for a clean=false client', function (t) {
  325. t.plan(5)
  326. const opts = { clean: false }
  327. const s = connect(setup(), opts)
  328. t.teardown(s.broker.close.bind(s.broker))
  329. subscribe(t, s, 'hello', 0, function () {
  330. s.conn.destroy(null, function () {
  331. t.pass('closed streams')
  332. })
  333. s.outStream.on('data', function () {
  334. t.fail('should not receive any more messages')
  335. })
  336. s.broker.once('unsubscribe', function () {
  337. t.fail('should not emit unsubscribe')
  338. })
  339. s.broker.publish({
  340. cmd: 'publish',
  341. topic: 'hello',
  342. payload: Buffer.from('world')
  343. }, function () {
  344. t.pass('calls the callback')
  345. })
  346. })
  347. })
  348. test('disconnect', function (t) {
  349. t.plan(1)
  350. const s = noError(connect(setup()), t)
  351. t.teardown(s.broker.close.bind(s.broker))
  352. s.broker.on('clientDisconnect', function () {
  353. t.pass('closed stream')
  354. })
  355. s.inStream.write({
  356. cmd: 'disconnect'
  357. })
  358. })
  359. test('disconnect client on wrong cmd', function (t) {
  360. t.plan(1)
  361. const s = noError(connect(setup()), t)
  362. t.teardown(s.broker.close.bind(s.broker))
  363. s.broker.on('clientDisconnect', function () {
  364. t.pass('closed stream')
  365. })
  366. s.broker.on('clientReady', function (c) {
  367. // don't use stream write here because it will throw an error on mqtt_packet genetete
  368. c._parser.emit('packet', { cmd: 'pippo' })
  369. })
  370. })
  371. test('client closes', function (t) {
  372. t.plan(5)
  373. const broker = aedes()
  374. const client = noError(connect(setup(broker), { clientId: 'abcde' }))
  375. broker.on('clientReady', function () {
  376. const brokerClient = broker.clients.abcde
  377. t.equal(brokerClient.connected, true, 'client connected')
  378. eos(client.conn, t.pass.bind(t, 'client closes'))
  379. setImmediate(() => {
  380. brokerClient.close(function () {
  381. t.equal(broker.clients.abcde, undefined, 'client instance is removed')
  382. })
  383. t.equal(brokerClient.connected, false, 'client disconnected')
  384. broker.close(function (err) {
  385. t.error(err, 'no error')
  386. })
  387. })
  388. })
  389. })
  390. test('broker closes', function (t) {
  391. t.plan(4)
  392. const broker = aedes()
  393. const client = noError(connect(setup(broker), {
  394. clientId: 'abcde'
  395. }, function () {
  396. eos(client.conn, t.pass.bind(t, 'client closes'))
  397. broker.close(function (err) {
  398. t.error(err, 'no error')
  399. t.ok(broker.closed)
  400. t.equal(broker.clients.abcde, undefined, 'client instance is removed')
  401. })
  402. }))
  403. })
  404. test('broker closes gracefully', function (t) {
  405. t.plan(7)
  406. const broker = aedes()
  407. const client1 = noError(connect(setup(broker), {
  408. }, function () {
  409. const client2 = noError(connect(setup(broker), {
  410. }, function () {
  411. t.equal(broker.connectedClients, 2, '2 connected clients')
  412. eos(client1.conn, t.pass.bind(t, 'client1 closes'))
  413. eos(client2.conn, t.pass.bind(t, 'client2 closes'))
  414. broker.close(function (err) {
  415. t.error(err, 'no error')
  416. t.ok(broker.mq.closed, 'broker mq closes')
  417. t.ok(broker.closed, 'broker closes')
  418. t.equal(broker.connectedClients, 0, 'no connected clients')
  419. })
  420. }))
  421. }))
  422. })
  423. test('testing other event', function (t) {
  424. t.plan(1)
  425. const broker = aedes()
  426. t.teardown(broker.close.bind(broker))
  427. const client = setup(broker)
  428. broker.on('connectionError', function (client, error) {
  429. t.notOk(client.id, null)
  430. })
  431. client.conn.emit('error', 'Connect not yet arrived')
  432. })
  433. test('connect without a clientId for MQTT 3.1.1', function (t) {
  434. t.plan(1)
  435. const s = setup()
  436. t.teardown(s.broker.close.bind(s.broker))
  437. s.inStream.write({
  438. cmd: 'connect',
  439. protocolId: 'MQTT',
  440. protocolVersion: 4,
  441. clean: true,
  442. keepalive: 0
  443. })
  444. s.outStream.on('data', function (packet) {
  445. t.same(packet, {
  446. cmd: 'connack',
  447. returnCode: 0,
  448. length: 2,
  449. qos: 0,
  450. retain: false,
  451. dup: false,
  452. topic: null,
  453. payload: null,
  454. sessionPresent: false
  455. }, 'successful connack')
  456. })
  457. })
  458. test('disconnect existing client with the same clientId', function (t) {
  459. t.plan(2)
  460. const broker = aedes()
  461. t.teardown(broker.close.bind(broker))
  462. const c1 = connect(setup(broker), {
  463. clientId: 'abcde'
  464. }, function () {
  465. eos(c1.conn, function () {
  466. t.pass('first client disconnected')
  467. })
  468. connect(setup(broker), {
  469. clientId: 'abcde'
  470. }, function () {
  471. t.pass('second client connected')
  472. })
  473. })
  474. })
  475. test('disconnect if another broker connects the same clientId', function (t) {
  476. t.plan(2)
  477. const broker = aedes()
  478. t.teardown(broker.close.bind(broker))
  479. const c1 = connect(setup(broker), {
  480. clientId: 'abcde'
  481. }, function () {
  482. eos(c1.conn, function () {
  483. t.pass('disconnect first client')
  484. })
  485. broker.publish({
  486. topic: '$SYS/anotherBroker/new/clients',
  487. payload: Buffer.from('abcde')
  488. }, function () {
  489. t.pass('second client connects to another broker')
  490. })
  491. })
  492. })
  493. test('publish to $SYS/broker/new/clients', function (t) {
  494. t.plan(1)
  495. const broker = aedes()
  496. t.teardown(broker.close.bind(broker))
  497. broker.mq.on('$SYS/' + broker.id + '/new/clients', function (packet, done) {
  498. t.equal(packet.payload.toString(), 'abcde', 'clientId matches')
  499. done()
  500. })
  501. connect(setup(broker), {
  502. clientId: 'abcde'
  503. })
  504. })
  505. test('publish to $SYS/broker/new/subsribers and $SYS/broker/new/unsubsribers', function (t) {
  506. t.plan(7)
  507. const broker = aedes()
  508. t.teardown(broker.close.bind(broker))
  509. const sub = {
  510. topic: 'hello',
  511. qos: 0
  512. }
  513. broker.mq.on('$SYS/' + broker.id + '/new/subscribes', function (packet, done) {
  514. const payload = JSON.parse(packet.payload.toString())
  515. t.equal(payload.clientId, 'abcde', 'clientId matches')
  516. t.same(payload.subs, [sub], 'subscriptions matches')
  517. done()
  518. })
  519. broker.mq.on('$SYS/' + broker.id + '/new/unsubscribes', function (packet, done) {
  520. const payload = JSON.parse(packet.payload.toString())
  521. t.equal(payload.clientId, 'abcde', 'clientId matches')
  522. t.same(payload.subs, [sub.topic], 'unsubscriptions matches')
  523. done()
  524. })
  525. const subscriber = connect(setup(broker), {
  526. clean: false, clientId: 'abcde'
  527. }, function () {
  528. subscribe(t, subscriber, sub.topic, sub.qos, function () {
  529. subscriber.inStream.write({
  530. cmd: 'unsubscribe',
  531. messageId: 43,
  532. unsubscriptions: ['hello']
  533. })
  534. })
  535. })
  536. })
  537. test('restore QoS 0 subscriptions not clean', function (t) {
  538. t.plan(5)
  539. const broker = aedes()
  540. t.teardown(broker.close.bind(broker))
  541. const expected = {
  542. cmd: 'publish',
  543. topic: 'hello',
  544. payload: Buffer.from('world'),
  545. qos: 0,
  546. dup: false,
  547. length: 12,
  548. retain: false
  549. }
  550. let subscriber = connect(setup(broker), {
  551. clean: false, clientId: 'abcde'
  552. }, function () {
  553. subscribe(t, subscriber, 'hello', 0, function () {
  554. subscriber.inStream.end()
  555. const publisher = connect(setup(broker), {
  556. }, function () {
  557. subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
  558. t.equal(connect.sessionPresent, true, 'session present is set to true')
  559. publisher.inStream.write({
  560. cmd: 'publish',
  561. topic: 'hello',
  562. payload: 'world',
  563. qos: 0
  564. })
  565. })
  566. subscriber.outStream.once('data', function (packet) {
  567. t.same(packet, expected, 'packet must match')
  568. })
  569. })
  570. })
  571. })
  572. })
  573. test('do not restore QoS 0 subscriptions when clean', function (t) {
  574. t.plan(5)
  575. const broker = aedes()
  576. t.teardown(broker.close.bind(broker))
  577. let subscriber = connect(setup(broker), {
  578. clean: true, clientId: 'abcde'
  579. }, function () {
  580. subscribe(t, subscriber, 'hello', 0, function () {
  581. subscriber.inStream.end()
  582. subscriber.broker.persistence.subscriptionsByClient(broker.clients.abcde, function (_, subs, client) {
  583. t.equal(subs, null, 'no previous subscriptions restored')
  584. })
  585. const publisher = connect(setup(broker), {
  586. }, function () {
  587. subscriber = connect(setup(broker), {
  588. clean: true, clientId: 'abcde'
  589. }, function (connect) {
  590. t.equal(connect.sessionPresent, false, 'session present is set to false')
  591. publisher.inStream.write({
  592. cmd: 'publish',
  593. topic: 'hello',
  594. payload: 'world',
  595. qos: 0
  596. })
  597. })
  598. subscriber.outStream.once('data', function (packet) {
  599. t.fail('packet received')
  600. })
  601. })
  602. })
  603. })
  604. })
  605. test('double sub does not double deliver', function (t) {
  606. t.plan(7)
  607. const expected = {
  608. cmd: 'publish',
  609. topic: 'hello',
  610. payload: Buffer.from('world'),
  611. dup: false,
  612. length: 12,
  613. qos: 0,
  614. retain: false
  615. }
  616. const s = connect(setup(), {
  617. }, function () {
  618. subscribe(t, s, 'hello', 0, function () {
  619. subscribe(t, s, 'hello', 0, function () {
  620. s.outStream.once('data', function (packet) {
  621. t.same(packet, expected, 'packet matches')
  622. s.outStream.on('data', function () {
  623. t.fail('double deliver')
  624. })
  625. })
  626. s.broker.publish({
  627. cmd: 'publish',
  628. topic: 'hello',
  629. payload: 'world'
  630. })
  631. })
  632. })
  633. })
  634. t.teardown(s.broker.close.bind(s.broker))
  635. })
  636. test('overlapping sub does not double deliver', function (t) {
  637. t.plan(7)
  638. const expected = {
  639. cmd: 'publish',
  640. topic: 'hello',
  641. payload: Buffer.from('world'),
  642. dup: false,
  643. length: 12,
  644. qos: 0,
  645. retain: false
  646. }
  647. const s = connect(setup(), {
  648. }, function () {
  649. subscribe(t, s, 'hello', 0, function () {
  650. subscribe(t, s, 'hello/#', 0, function () {
  651. s.outStream.once('data', function (packet) {
  652. t.same(packet, expected, 'packet matches')
  653. s.outStream.on('data', function () {
  654. t.fail('double deliver')
  655. })
  656. })
  657. s.broker.publish({
  658. cmd: 'publish',
  659. topic: 'hello',
  660. payload: 'world'
  661. })
  662. })
  663. })
  664. })
  665. t.teardown(s.broker.close.bind(s.broker))
  666. })
  667. test('clear drain', function (t) {
  668. t.plan(4)
  669. const s = connect(setup(), {
  670. }, function () {
  671. subscribe(t, s, 'hello', 0, function () {
  672. // fake a busy socket
  673. s.conn.write = function (chunk, enc, cb) {
  674. return false
  675. }
  676. s.broker.publish({
  677. cmd: 'publish',
  678. topic: 'hello',
  679. payload: 'world'
  680. }, function () {
  681. t.pass('callback called')
  682. })
  683. s.conn.destroy()
  684. })
  685. })
  686. t.teardown(s.broker.close.bind(s.broker))
  687. })
  688. test('id option', function (t) {
  689. t.plan(2)
  690. const broker1 = aedes()
  691. setup(broker1).conn.destroy()
  692. t.ok(broker1.id, 'broker gets random id when id option not set')
  693. const broker2 = aedes({ id: 'abc' })
  694. setup(broker2).conn.destroy()
  695. t.equal(broker2.id, 'abc', 'broker id equals id option when set')
  696. t.teardown(() => {
  697. broker1.close()
  698. broker2.close()
  699. })
  700. })
  701. test('not duplicate client close when client error occurs', function (t) {
  702. t.plan(1)
  703. const broker = aedes()
  704. t.teardown(broker.close.bind(broker))
  705. connect(setup(broker))
  706. broker.on('client', function (client) {
  707. client.conn.on('drain', () => {
  708. t.pass('client closed ok')
  709. })
  710. client.close()
  711. // add back to test if there is duplicated close() call
  712. client.conn.on('drain', () => {
  713. t.fail('double client close calls')
  714. })
  715. })
  716. })
  717. test('not duplicate client close when double close() called', function (t) {
  718. t.plan(1)
  719. const broker = aedes()
  720. t.teardown(broker.close.bind(broker))
  721. connect(setup(broker))
  722. broker.on('clientReady', function (client) {
  723. client.conn.on('drain', () => {
  724. t.pass('client closed ok')
  725. })
  726. client.close()
  727. // add back to test if there is duplicated close() call
  728. client.conn.on('drain', () => {
  729. t.fail('double execute client close function')
  730. })
  731. client.close()
  732. })
  733. })