events.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. 'use strict'
  2. const { test } = require('tap')
  3. const mqemitter = require('mqemitter')
  4. const { setup, connect, subscribe } = require('./helper')
  5. const aedes = require('../')
  6. test('publishes an hearbeat', function (t) {
  7. t.plan(2)
  8. const broker = aedes({
  9. heartbeatInterval: 10 // ms
  10. })
  11. t.teardown(broker.close.bind(broker))
  12. broker.subscribe('$SYS/+/heartbeat', function (message, cb) {
  13. const id = message.topic.match(/\$SYS\/([^/]+)\/heartbeat/)[1]
  14. t.equal(id, broker.id, 'broker id matches')
  15. t.same(message.payload.toString(), id, 'message has id as the payload')
  16. cb()
  17. })
  18. })
  19. test('publishes birth', function (t) {
  20. t.plan(4)
  21. const mq = mqemitter()
  22. const brokerId = 'test-broker'
  23. const fakeBroker = 'fake-broker'
  24. const clientId = 'test-client'
  25. mq.on(`$SYS/${brokerId}/birth`, (message, cb) => {
  26. t.pass('broker birth received')
  27. t.same(message.payload.toString(), brokerId, 'message has id as the payload')
  28. cb()
  29. })
  30. const broker = aedes({
  31. id: brokerId,
  32. mq
  33. })
  34. broker.on('client', (client) => {
  35. t.equal(client.id, clientId, 'client connected')
  36. // set a fake counter on a fake broker
  37. process.nextTick(() => {
  38. broker.clients[clientId].duplicates[fakeBroker] = 42
  39. mq.emit({ topic: `$SYS/${fakeBroker}/birth`, payload: Buffer.from(fakeBroker) })
  40. })
  41. })
  42. mq.on(`$SYS/${fakeBroker}/birth`, (message, cb) => {
  43. process.nextTick(() => {
  44. t.equal(!!broker.clients[clientId].duplicates[fakeBroker], false, 'client duplicates has been resetted')
  45. cb()
  46. })
  47. })
  48. const s = connect(setup(broker), { clientId })
  49. t.teardown(s.broker.close.bind(s.broker))
  50. })
  51. ;['$mcollina', '$SYS'].forEach(function (topic) {
  52. test('does not forward $ prefixed topics to # subscription - ' + topic, function (t) {
  53. t.plan(4)
  54. const s = connect(setup())
  55. t.teardown(s.broker.close.bind(s.broker))
  56. subscribe(t, s, '#', 0, function () {
  57. s.outStream.once('data', function (packet) {
  58. t.fail('no packet should be received')
  59. })
  60. s.broker.mq.emit({
  61. cmd: 'publish',
  62. topic: topic + '/hello',
  63. payload: 'world'
  64. }, function () {
  65. t.pass('nothing happened')
  66. })
  67. })
  68. })
  69. test('does not forward $ prefixed topics to +/# subscription - ' + topic, function (t) {
  70. t.plan(4)
  71. const s = connect(setup())
  72. t.teardown(s.broker.close.bind(s.broker))
  73. subscribe(t, s, '+/#', 0, function () {
  74. s.outStream.once('data', function (packet) {
  75. t.fail('no packet should be received')
  76. })
  77. s.broker.mq.emit({
  78. cmd: 'publish',
  79. topic: topic + '/hello',
  80. payload: 'world'
  81. }, function () {
  82. t.pass('nothing happened')
  83. })
  84. })
  85. })
  86. })
  87. test('does not store $SYS topics to QoS 1 # subscription', function (t) {
  88. t.plan(3)
  89. const broker = aedes()
  90. t.teardown(broker.close.bind(broker))
  91. const opts = { clean: false, clientId: 'abcde' }
  92. let s = connect(setup(broker), opts)
  93. subscribe(t, s, '#', 1, function () {
  94. s.inStream.end()
  95. s.broker.publish({
  96. cmd: 'publish',
  97. topic: '$SYS/hello',
  98. payload: 'world',
  99. qos: 1
  100. }, function () {
  101. s = connect(setup(broker), { clean: false, clientId: 'abcde' })
  102. s.outStream.once('data', function (packet) {
  103. t.fail('no packet should be received')
  104. })
  105. })
  106. })
  107. })
  108. test('Emit event when receives a ping', { timeout: 2000 }, function (t) {
  109. t.plan(5)
  110. const broker = aedes()
  111. t.teardown(broker.close.bind(broker))
  112. broker.on('ping', function (packet, client) {
  113. if (client && client) {
  114. t.equal(client.id, 'abcde')
  115. t.equal(packet.cmd, 'pingreq')
  116. t.equal(packet.payload, null)
  117. t.equal(packet.topic, null)
  118. t.equal(packet.length, 0)
  119. }
  120. })
  121. const s = connect(setup(broker), { clientId: 'abcde' })
  122. s.inStream.write({
  123. cmd: 'pingreq'
  124. })
  125. })
  126. test('Emit event when broker closed', function (t) {
  127. t.plan(1)
  128. const broker = aedes()
  129. broker.once('closed', function () {
  130. t.ok(true)
  131. })
  132. broker.close()
  133. })
  134. test('Emit closed event one only when double broker.close()', function (t) {
  135. t.plan(4)
  136. const broker = aedes()
  137. broker.on('closed', function () {
  138. t.pass('closed')
  139. })
  140. t.notOk(broker.closed)
  141. broker.close()
  142. t.ok(broker.closed)
  143. broker.close()
  144. t.ok(broker.closed)
  145. })
  146. test('Test backpressure aedes published function', function (t) {
  147. t.plan(2)
  148. let publishCount = 10
  149. let count = 0
  150. const broker = aedes({
  151. published: function (packet, client, done) {
  152. if (client) {
  153. count++
  154. setTimeout(() => {
  155. publisher.end()
  156. done()
  157. })
  158. } else { done() }
  159. }
  160. })
  161. const mqtt = require('mqtt')
  162. const server = require('net').createServer(broker.handle)
  163. let publisher
  164. server.listen(0, function () {
  165. const port = server.address().port
  166. publisher = mqtt.connect({ port, host: 'localhost', clean: true, keepalive: 30 })
  167. function next () {
  168. if (--publishCount > 0) { process.nextTick(publish) }
  169. }
  170. function publish () {
  171. publisher.publish('test', 'payload', next)
  172. }
  173. publisher.on('connect', publish)
  174. publisher.on('end', function () {
  175. t.ok(count > publishCount)
  176. t.equal(publishCount, 0)
  177. broker.close()
  178. server.close()
  179. })
  180. })
  181. })
  182. test('clear closed clients when the same clientId is managed by another broker', function (t) {
  183. t.plan(2)
  184. const clientId = 'closed-client'
  185. const aedesBroker = aedes()
  186. // simulate a closed client on the broker
  187. aedesBroker.clients[clientId] = { closed: true, broker: aedesBroker }
  188. aedesBroker.connectedClients = 1
  189. // simulate the creation of the same client on another broker of the cluster
  190. aedesBroker.publish({ topic: '$SYS/anotherbroker/new/clients', payload: clientId }, () => {
  191. t.equal(aedesBroker.clients[clientId], undefined) // check that the closed client was removed
  192. t.equal(aedesBroker.connectedClients, 0)
  193. })
  194. t.teardown(aedesBroker.close.bind(aedesBroker))
  195. })