not-blocking.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. 'use strict'
  2. const { test } = require('tap')
  3. const EventEmitter = require('events')
  4. const mqtt = require('mqtt')
  5. const net = require('net')
  6. const Faketimers = require('@sinonjs/fake-timers')
  7. const aedes = require('../')
  8. test('connect 500 concurrent clients', function (t) {
  9. t.plan(3)
  10. const evt = new EventEmitter()
  11. const broker = aedes()
  12. const server = net.createServer(broker.handle)
  13. const total = 500
  14. server.listen(0, function (err) {
  15. t.error(err, 'no error')
  16. const clock = Faketimers.createClock()
  17. t.teardown(clock.reset.bind(clock))
  18. const port = server.address().port
  19. let connected = 0
  20. const clients = []
  21. clock.setTimeout(function () {
  22. t.equal(clients.length, total)
  23. t.equal(connected, total)
  24. while (clients.length) {
  25. clients.shift().end()
  26. }
  27. }, total)
  28. evt.on('finish', function () {
  29. if (clients.length === 0) {
  30. broker.close()
  31. server.close()
  32. }
  33. })
  34. for (let i = 0; i < total; i++) {
  35. clients[i] = mqtt.connect({
  36. port,
  37. keepalive: 0,
  38. reconnectPeriod: 100
  39. }).on('connect', function () {
  40. connected++
  41. if ((connected % (total / 10)) === 0) {
  42. console.log('connected', connected)
  43. }
  44. clock.tick(1)
  45. }).on('close', function () {
  46. evt.emit('finish')
  47. })
  48. }
  49. })
  50. })
  51. test('do not block after a subscription', function (t) {
  52. t.plan(3)
  53. const evt = new EventEmitter()
  54. const broker = aedes()
  55. const server = net.createServer(broker.handle)
  56. const total = 10000
  57. let sent = 0
  58. let received = 0
  59. server.listen(0, function (err) {
  60. t.error(err, 'no error')
  61. const clock = Faketimers.createClock()
  62. t.teardown(clock.reset.bind(clock))
  63. const clockId = clock.setTimeout(finish, total)
  64. const port = server.address().port
  65. const publisher = mqtt.connect({
  66. port,
  67. keepalive: 0
  68. }).on('error', function (err) {
  69. clock.clearTimeout(clockId)
  70. t.fail(err)
  71. })
  72. let subscriber
  73. function immediatePublish () {
  74. setImmediate(publish)
  75. }
  76. function publish () {
  77. if (sent === total) {
  78. publisher.end()
  79. } else {
  80. sent++
  81. publisher.publish('test', 'payload', immediatePublish)
  82. }
  83. }
  84. function startSubscriber () {
  85. subscriber = mqtt.connect({
  86. port,
  87. keepalive: 0
  88. }).on('error', function (err) {
  89. clock.clearTimeout(clockId)
  90. t.fail(err)
  91. })
  92. subscriber.subscribe('test', publish)
  93. subscriber.on('message', function () {
  94. if (received % (total / 10) === 0) {
  95. console.log('sent / received', sent, received)
  96. }
  97. received++
  98. clock.tick(1)
  99. })
  100. subscriber.on('close', function () {
  101. evt.emit('finish')
  102. })
  103. }
  104. publisher.on('connect', startSubscriber)
  105. publisher.on('close', function () {
  106. evt.emit('finish')
  107. })
  108. evt.on('finish', function () {
  109. if (publisher.connected || subscriber.connected) { return }
  110. broker.close()
  111. server.close()
  112. t.equal(total, sent, 'messages sent')
  113. t.equal(total, received, 'messages received')
  114. })
  115. function finish () {
  116. subscriber.end()
  117. publisher.end()
  118. }
  119. })
  120. })
  121. test('do not block with overlapping subscription', function (t) {
  122. t.plan(3)
  123. const evt = new EventEmitter()
  124. const broker = aedes({ concurrency: 15 })
  125. const server = net.createServer(broker.handle)
  126. const total = 10000
  127. let sent = 0
  128. let received = 0
  129. server.listen(0, function (err) {
  130. t.error(err, 'no error')
  131. const clock = Faketimers.createClock()
  132. t.teardown(clock.reset.bind(clock))
  133. const clockId = clock.setTimeout(finish, total)
  134. const port = server.address().port
  135. const publisher = mqtt.connect({
  136. port,
  137. keepalive: 0
  138. }).on('error', function (err) {
  139. clock.clearTimeout(clockId)
  140. t.fail(err)
  141. })
  142. let subscriber
  143. function immediatePublish (e) {
  144. setImmediate(publish)
  145. }
  146. function publish () {
  147. if (sent === total) {
  148. publisher.end()
  149. } else {
  150. sent++
  151. publisher.publish('test', 'payload', immediatePublish)
  152. }
  153. }
  154. function startSubscriber () {
  155. subscriber = mqtt.connect({
  156. port,
  157. keepalive: 0
  158. }).on('error', function (err) {
  159. clock.clearTimeout(clockId)
  160. t.fail(err)
  161. })
  162. subscriber.subscribe('#', function () {
  163. subscriber.subscribe('test', function () {
  164. immediatePublish()
  165. })
  166. })
  167. subscriber.on('message', function () {
  168. if (received % (total / 10) === 0) {
  169. console.log('sent / received', sent, received)
  170. }
  171. received++
  172. clock.tick(1)
  173. })
  174. subscriber.on('close', function () {
  175. evt.emit('finish')
  176. })
  177. }
  178. publisher.on('connect', startSubscriber)
  179. publisher.on('close', function () {
  180. evt.emit('finish')
  181. })
  182. evt.on('finish', function () {
  183. if (publisher.connected || subscriber.connected) { return }
  184. broker.close()
  185. server.close()
  186. t.equal(total, sent, 'messages sent')
  187. t.equal(total, received, 'messages received')
  188. })
  189. function finish () {
  190. subscriber.end()
  191. publisher.end()
  192. }
  193. })
  194. })