will.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. 'use strict'
  2. const { test } = require('tap')
  3. const memory = require('aedes-persistence')
  4. const Faketimers = require('@sinonjs/fake-timers')
  5. const { setup, connect, noError } = require('./helper')
  6. const aedes = require('../')
  7. function willConnect (s, opts, connected) {
  8. opts = opts || {}
  9. opts.will = {
  10. topic: 'mywill',
  11. payload: Buffer.from('last will'),
  12. qos: 0,
  13. retain: false
  14. }
  15. return connect(s, opts, connected)
  16. }
  17. test('delivers a will', function (t) {
  18. t.plan(4)
  19. const opts = {}
  20. // willConnect populates opts with a will
  21. const s = willConnect(setup(),
  22. opts,
  23. function () {
  24. s.conn.destroy()
  25. }
  26. )
  27. t.teardown(s.broker.close.bind(s.broker))
  28. s.broker.mq.on('mywill', function (packet, cb) {
  29. t.equal(packet.topic, opts.will.topic, 'topic matches')
  30. t.same(packet.payload, opts.will.payload, 'payload matches')
  31. t.equal(packet.qos, opts.will.qos, 'qos matches')
  32. t.equal(packet.retain, opts.will.retain, 'retain matches')
  33. cb()
  34. })
  35. })
  36. test('calling close two times should not deliver two wills', function (t) {
  37. t.plan(4)
  38. const opts = {}
  39. const broker = aedes()
  40. t.teardown(broker.close.bind(broker))
  41. broker.on('client', function (client) {
  42. client.close()
  43. client.close()
  44. })
  45. broker.mq.on('mywill', onWill)
  46. // willConnect populates opts with a will
  47. willConnect(setup(broker), opts)
  48. function onWill (packet, cb) {
  49. broker.mq.removeListener('mywill', onWill, function () {
  50. broker.mq.on('mywill', function (packet) {
  51. t.fail('the will must be delivered only once')
  52. })
  53. })
  54. t.equal(packet.topic, opts.will.topic, 'topic matches')
  55. t.same(packet.payload, opts.will.payload, 'payload matches')
  56. t.equal(packet.qos, opts.will.qos, 'qos matches')
  57. t.equal(packet.retain, opts.will.retain, 'retain matches')
  58. cb()
  59. }
  60. })
  61. test('delivers old will in case of a crash', function (t) {
  62. t.plan(8)
  63. const persistence = memory()
  64. const will = {
  65. topic: 'mywill',
  66. payload: Buffer.from('last will'),
  67. qos: 0,
  68. retain: false
  69. }
  70. persistence.broker = {
  71. id: 'anotherBroker'
  72. }
  73. persistence.putWill({
  74. id: 'myClientId42'
  75. }, will, function (err) {
  76. t.error(err, 'no error')
  77. let authorized = false
  78. const interval = 10 // ms, so that the will check happens fast!
  79. const broker = aedes({
  80. persistence,
  81. heartbeatInterval: interval,
  82. authorizePublish: function (client, packet, callback) {
  83. t.strictSame(client, null, 'client must be null')
  84. authorized = true
  85. callback(null)
  86. }
  87. })
  88. t.teardown(broker.close.bind(broker))
  89. const start = Date.now()
  90. broker.mq.on('mywill', check)
  91. function check (packet, cb) {
  92. broker.mq.removeListener('mywill', check, function () {
  93. broker.mq.on('mywill', function (packet) {
  94. t.fail('the will must be delivered only once')
  95. })
  96. })
  97. t.ok(Date.now() - start >= 3 * interval, 'the will needs to be emitted after 3 heartbeats')
  98. t.equal(packet.topic, will.topic, 'topic matches')
  99. t.same(packet.payload, will.payload, 'payload matches')
  100. t.equal(packet.qos, will.qos, 'qos matches')
  101. t.equal(packet.retain, will.retain, 'retain matches')
  102. t.equal(authorized, true, 'authorization called')
  103. cb()
  104. }
  105. })
  106. })
  107. test('deliver old will without authorization in case of a crash', function (t) {
  108. t.plan(2)
  109. const persistence = memory()
  110. const will = {
  111. topic: 'mywill',
  112. payload: Buffer.from('last will'),
  113. qos: 0,
  114. retain: false
  115. }
  116. persistence.broker = {
  117. id: 'anotherBroker'
  118. }
  119. persistence.putWill({
  120. id: 'myClientId42'
  121. }, will, function (err) {
  122. t.error(err, 'no error')
  123. const interval = 10 // ms, so that the will check happens fast!
  124. const broker = aedes({
  125. persistence,
  126. heartbeatInterval: interval,
  127. authorizePublish: function (client, packet, callback) {
  128. t.strictSame(client, null, 'client must be null')
  129. callback(new Error())
  130. }
  131. })
  132. t.teardown(broker.close.bind(broker))
  133. broker.mq.on('mywill', check)
  134. function check (packet, cb) {
  135. t.fail('received will without authorization')
  136. cb()
  137. }
  138. })
  139. })
  140. test('delete old broker', function (t) {
  141. t.plan(1)
  142. const clock = Faketimers.install()
  143. const heartbeatInterval = 100
  144. const broker = aedes({
  145. heartbeatInterval
  146. })
  147. t.teardown(broker.close.bind(broker))
  148. const brokerId = 'dummyBroker'
  149. broker.brokers[brokerId] = Date.now() - heartbeatInterval * 3.5
  150. setTimeout(() => {
  151. t.equal(broker.brokers[brokerId], undefined, 'Broker deleted')
  152. }, heartbeatInterval * 4)
  153. clock.tick(heartbeatInterval * 4)
  154. clock.uninstall()
  155. })
  156. test('store the will in the persistence', function (t) {
  157. t.plan(5)
  158. const opts = {
  159. clientId: 'abcde'
  160. }
  161. // willConnect populates opts with a will
  162. const s = willConnect(setup(), opts)
  163. t.teardown(s.broker.close.bind(s.broker))
  164. s.broker.on('client', function () {
  165. // this is connack
  166. s.broker.persistence.getWill({
  167. id: opts.clientId
  168. }, function (err, packet) {
  169. t.error(err, 'no error')
  170. t.same(packet.topic, opts.will.topic, 'will topic matches')
  171. t.same(packet.payload, opts.will.payload, 'will payload matches')
  172. t.same(packet.qos, opts.will.qos, 'will qos matches')
  173. t.same(packet.retain, opts.will.retain, 'will retain matches')
  174. })
  175. })
  176. })
  177. test('delete the will in the persistence after publish', function (t) {
  178. t.plan(2)
  179. const opts = {
  180. clientId: 'abcde'
  181. }
  182. const broker = aedes()
  183. t.teardown(broker.close.bind(broker))
  184. broker.on('client', function (client) {
  185. setImmediate(function () {
  186. client.close()
  187. })
  188. })
  189. broker.mq.on('mywill', check)
  190. // willConnect populates opts with a will
  191. willConnect(setup(broker), opts)
  192. function check (packet, cb) {
  193. broker.mq.removeListener('mywill', check, function () {
  194. broker.persistence.getWill({
  195. id: opts.clientId
  196. }, function (err, p) {
  197. t.error(err, 'no error')
  198. t.notOk(p, 'packet is empty')
  199. })
  200. })
  201. cb()
  202. }
  203. })
  204. test('delivers a will with authorization', function (t) {
  205. t.plan(6)
  206. let authorized = false
  207. const opts = {}
  208. // willConnect populates opts with a will
  209. const s = willConnect(
  210. setup(aedes({
  211. authorizePublish: (client, packet, callback) => {
  212. authorized = true
  213. callback(null)
  214. }
  215. })),
  216. opts,
  217. function () {
  218. s.conn.destroy()
  219. }
  220. )
  221. t.teardown(s.broker.close.bind(s.broker))
  222. s.broker.on('clientDisconnect', function (client) {
  223. t.equal(client.connected, false)
  224. })
  225. s.broker.mq.on('mywill', function (packet, cb) {
  226. t.equal(packet.topic, opts.will.topic, 'topic matches')
  227. t.same(packet.payload, opts.will.payload, 'payload matches')
  228. t.equal(packet.qos, opts.will.qos, 'qos matches')
  229. t.equal(packet.retain, opts.will.retain, 'retain matches')
  230. t.equal(authorized, true, 'authorization called')
  231. cb()
  232. })
  233. })
  234. test('delivers a will waits for authorization', function (t) {
  235. t.plan(6)
  236. let authorized = false
  237. const opts = {}
  238. // willConnect populates opts with a will
  239. const s = willConnect(
  240. setup(aedes({
  241. authorizePublish: (client, packet, callback) => {
  242. authorized = true
  243. setImmediate(() => { callback(null) })
  244. }
  245. })),
  246. opts,
  247. function () {
  248. s.conn.destroy()
  249. }
  250. )
  251. t.teardown(s.broker.close.bind(s.broker))
  252. s.broker.on('clientDisconnect', function () {
  253. t.pass('client is disconnected')
  254. })
  255. s.broker.mq.on('mywill', function (packet, cb) {
  256. t.equal(packet.topic, opts.will.topic, 'topic matches')
  257. t.same(packet.payload, opts.will.payload, 'payload matches')
  258. t.equal(packet.qos, opts.will.qos, 'qos matches')
  259. t.equal(packet.retain, opts.will.retain, 'retain matches')
  260. t.equal(authorized, true, 'authorization called')
  261. cb()
  262. })
  263. })
  264. test('does not deliver a will without authorization', function (t) {
  265. t.plan(1)
  266. let authorized = false
  267. const opts = {}
  268. // willConnect populates opts with a will
  269. const s = willConnect(
  270. setup(aedes({
  271. authorizePublish: (username, packet, callback) => {
  272. authorized = true
  273. callback(new Error())
  274. }
  275. })),
  276. opts,
  277. function () {
  278. s.conn.destroy()
  279. }
  280. )
  281. t.teardown(s.broker.close.bind(s.broker))
  282. s.broker.on('clientDisconnect', function () {
  283. t.equal(authorized, true, 'authorization called')
  284. })
  285. s.broker.mq.on('mywill', function (packet, cb) {
  286. t.fail('received will without authorization')
  287. cb()
  288. })
  289. })
  290. test('does not deliver a will without authentication', function (t) {
  291. t.plan(1)
  292. let authenticated = false
  293. const opts = {}
  294. // willConnect populates opts with a will
  295. const s = willConnect(
  296. setup(aedes({
  297. authenticate: (client, username, password, callback) => {
  298. authenticated = true
  299. callback(new Error(), false)
  300. }
  301. })),
  302. opts
  303. )
  304. t.teardown(s.broker.close.bind(s.broker))
  305. s.broker.on('clientError', function () {
  306. t.equal(authenticated, true, 'authentication called')
  307. t.end()
  308. })
  309. s.broker.mq.on('mywill', function (packet, cb) {
  310. t.fail('received will without authentication')
  311. cb()
  312. })
  313. })
  314. test('does not deliver will if broker is closed during authentication', function (t) {
  315. t.plan(0)
  316. const opts = { keepalive: 1 }
  317. const broker = aedes({
  318. authenticate: function (client, username, password, callback) {
  319. setTimeout(function () {
  320. callback(null, true)
  321. })
  322. broker.close()
  323. }
  324. })
  325. broker.on('keepaliveTimeout', function () {
  326. t.fail('keepalive timer shoud not be set')
  327. })
  328. broker.mq.on('mywill', function (packet, cb) {
  329. t.fail('Received will when it was not expected')
  330. cb()
  331. })
  332. willConnect(setup(broker), opts)
  333. })
  334. // [MQTT-3.14.4-3]
  335. test('does not deliver will when client sends a DISCONNECT', function (t) {
  336. t.plan(0)
  337. const broker = aedes()
  338. t.teardown(broker.close.bind(broker))
  339. const s = noError(willConnect(setup(broker), {}, function () {
  340. s.inStream.end({
  341. cmd: 'disconnect'
  342. })
  343. }), t)
  344. s.broker.mq.on('mywill', function (packet, cb) {
  345. t.fail(packet)
  346. cb()
  347. })
  348. })
  349. test('deletes from persistence on DISCONNECT', function (t) {
  350. t.plan(2)
  351. const opts = {
  352. clientId: 'abcde'
  353. }
  354. const broker = aedes()
  355. t.teardown(broker.close.bind(broker))
  356. const s = noError(willConnect(setup(broker), opts, function () {
  357. s.inStream.end({
  358. cmd: 'disconnect'
  359. })
  360. }), t)
  361. s.broker.persistence.getWill({
  362. id: opts.clientId
  363. }, function (err, packet) {
  364. t.error(err, 'no error')
  365. t.notOk(packet)
  366. })
  367. })
  368. test('does not store multiple will with same clientid', function (t) {
  369. t.plan(4)
  370. const opts = { clientId: 'abcde' }
  371. const broker = aedes()
  372. let s = noError(willConnect(setup(broker), opts, function () {
  373. // gracefully close client so no will is sent
  374. s.inStream.end({
  375. cmd: 'disconnect'
  376. })
  377. }), t)
  378. broker.on('clientDisconnect', function (client) {
  379. // reconnect same client with will
  380. s = willConnect(setup(broker), opts, function () {
  381. // check that there are not 2 will messages for the same clientid
  382. s.broker.persistence.delWill({ id: opts.clientId }, function (err, packet) {
  383. t.error(err, 'no error')
  384. t.equal(packet.clientId, opts.clientId, 'will packet found')
  385. s.broker.persistence.delWill({ id: opts.clientId }, function (err, packet) {
  386. t.error(err, 'no error')
  387. t.equal(!!packet, false, 'no duplicated packets')
  388. broker.close()
  389. })
  390. })
  391. })
  392. })
  393. })
  394. test('don\'t delivers a will if broker alive', function (t) {
  395. const persistence = memory()
  396. const will = {
  397. topic: 'mywill',
  398. payload: Buffer.from('last will'),
  399. qos: 0,
  400. retain: false
  401. }
  402. const oldBroker = 'broker1'
  403. persistence.broker = {
  404. id: oldBroker
  405. }
  406. persistence.putWill({
  407. id: 'myClientId42'
  408. }, will, function (err) {
  409. t.error(err, 'no error')
  410. const opts = {
  411. persistence,
  412. heartbeatInterval: 10
  413. }
  414. let count = 0
  415. const broker = aedes(opts)
  416. t.teardown(broker.close.bind(broker))
  417. const streamWill = persistence.streamWill
  418. persistence.streamWill = function () {
  419. // don't pass broker.brokers to streamWill
  420. return streamWill.call(persistence)
  421. }
  422. broker.mq.on('mywill', function (packet, cb) {
  423. t.fail('Will received')
  424. cb()
  425. })
  426. broker.mq.on('$SYS/+/heartbeat', function () {
  427. t.pass('Heartbeat received')
  428. broker.brokers[oldBroker] = Date.now()
  429. if (++count === 5) {
  430. t.end()
  431. }
  432. })
  433. })
  434. })
  435. test('handle will publish error', function (t) {
  436. t.plan(2)
  437. const persistence = memory()
  438. const will = {
  439. topic: 'mywill',
  440. payload: Buffer.from('last will'),
  441. qos: 0,
  442. retain: false
  443. }
  444. persistence.broker = {
  445. id: 'broker1'
  446. }
  447. persistence.putWill({
  448. id: 'myClientId42'
  449. }, will, function (err) {
  450. t.error(err, 'no error')
  451. const opts = {
  452. persistence,
  453. heartbeatInterval: 10
  454. }
  455. persistence.delWill = function (client, cb) {
  456. cb(new Error('Throws error'))
  457. }
  458. const broker = aedes(opts)
  459. t.teardown(broker.close.bind(broker))
  460. broker.once('error', function (err) {
  461. t.equal('Throws error', err.message, 'throws error')
  462. })
  463. })
  464. })
  465. test('handle will publish error 2', function (t) {
  466. t.plan(2)
  467. const persistence = memory()
  468. const will = {
  469. topic: 'mywill',
  470. payload: Buffer.from('last will'),
  471. qos: 0,
  472. retain: true
  473. }
  474. persistence.broker = {
  475. id: 'broker1'
  476. }
  477. persistence.putWill({
  478. id: 'myClientId42'
  479. }, will, function (err) {
  480. t.error(err, 'no error')
  481. const opts = {
  482. persistence,
  483. heartbeatInterval: 10
  484. }
  485. persistence.storeRetained = function (packet, cb) {
  486. cb(new Error('Throws error'))
  487. }
  488. const broker = aedes(opts)
  489. t.teardown(broker.close.bind(broker))
  490. broker.once('error', function (err) {
  491. t.equal('Throws error', err.message, 'throws error')
  492. })
  493. })
  494. })