topics.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. 'use strict'
  2. const { test } = require('tap')
  3. const { setup, connect, subscribe } = require('./helper')
  4. const aedes = require('../')
  5. const { validateTopic } = require('../lib/utils')
  6. test('validation of `null` topic', function (t) {
  7. // issue #780
  8. t.plan(1)
  9. const err = validateTopic(null, 'SUBSCRIBE')
  10. t.equal(err.message, 'impossible to SUBSCRIBE to an empty topic')
  11. })
  12. // [MQTT-4.7.1-3]
  13. test('Single-level wildcard should match empty level', function (t) {
  14. t.plan(4)
  15. const s = connect(setup())
  16. t.teardown(s.broker.close.bind(s.broker))
  17. subscribe(t, s, 'a/+/b', 0, function () {
  18. s.outStream.once('data', function (packet) {
  19. t.pass('ok')
  20. })
  21. s.inStream.write({
  22. cmd: 'publish',
  23. topic: 'a//b',
  24. payload: 'world'
  25. })
  26. })
  27. })
  28. // [MQTT-4.7.3-1]
  29. test('publish empty topic', function (t) {
  30. t.plan(4)
  31. const s = connect(setup())
  32. subscribe(t, s, '#', 0, function () {
  33. s.outStream.once('data', function (packet) {
  34. t.fail('no packet')
  35. })
  36. s.inStream.write({
  37. cmd: 'publish',
  38. topic: '',
  39. payload: 'world'
  40. })
  41. s.broker.close(function () {
  42. t.equal(s.broker.connectedClients, 0, 'no connected clients')
  43. })
  44. })
  45. })
  46. test('publish invalid topic with #', function (t) {
  47. t.plan(4)
  48. const s = connect(setup())
  49. t.teardown(s.broker.close.bind(s.broker))
  50. subscribe(t, s, '#', 0, function () {
  51. s.outStream.once('data', function (packet) {
  52. t.fail('no packet')
  53. })
  54. s.inStream.write({
  55. cmd: 'publish',
  56. topic: 'hello/#',
  57. payload: 'world'
  58. })
  59. })
  60. s.broker.on('clientError', function () {
  61. t.pass('raise an error')
  62. })
  63. })
  64. test('publish invalid topic with +', function (t) {
  65. t.plan(4)
  66. const s = connect(setup())
  67. t.teardown(s.broker.close.bind(s.broker))
  68. subscribe(t, s, '#', 0, function () {
  69. s.outStream.once('data', function (packet) {
  70. t.fail('no packet')
  71. })
  72. s.inStream.write({
  73. cmd: 'publish',
  74. topic: 'hello/+/eee',
  75. payload: 'world'
  76. })
  77. })
  78. s.broker.on('clientError', function () {
  79. t.pass('raise an error')
  80. })
  81. })
  82. ;['base/#/sub', 'base/#sub', 'base/sub#', 'base/xyz+/sub', 'base/+xyz/sub', ''].forEach(function (topic) {
  83. test('subscribe to invalid topic with "' + topic + '"', function (t) {
  84. t.plan(1)
  85. const s = connect(setup())
  86. t.teardown(s.broker.close.bind(s.broker))
  87. s.broker.on('clientError', function () {
  88. t.pass('raise an error')
  89. })
  90. s.inStream.write({
  91. cmd: 'subscribe',
  92. messageId: 24,
  93. subscriptions: [{
  94. topic,
  95. qos: 0
  96. }]
  97. })
  98. })
  99. test('unsubscribe to invalid topic with "' + topic + '"', function (t) {
  100. t.plan(1)
  101. const s = connect(setup())
  102. t.teardown(s.broker.close.bind(s.broker))
  103. s.broker.on('clientError', function () {
  104. t.pass('raise an error')
  105. })
  106. s.inStream.write({
  107. cmd: 'unsubscribe',
  108. messageId: 24,
  109. unsubscriptions: [topic]
  110. })
  111. })
  112. })
  113. test('topics are case-sensitive', function (t) {
  114. t.plan(4)
  115. const broker = aedes()
  116. t.teardown(broker.close.bind(broker))
  117. const publisher = connect(setup(broker), { clean: true })
  118. const subscriber = connect(setup(broker), { clean: true })
  119. const expected = {
  120. cmd: 'publish',
  121. topic: 'hello',
  122. payload: Buffer.from('world'),
  123. qos: 0,
  124. dup: false,
  125. length: 12,
  126. retain: false
  127. }
  128. subscribe(t, subscriber, 'hello', 0, function () {
  129. subscriber.outStream.on('data', function (packet) {
  130. t.same(packet, expected, 'packet mush match')
  131. })
  132. ;['hello', 'HELLO', 'heLLo', 'HELLO/#', 'hello/+'].forEach(function (topic) {
  133. publisher.inStream.write({
  134. cmd: 'publish',
  135. topic,
  136. payload: 'world',
  137. qos: 0,
  138. retain: false
  139. })
  140. })
  141. })
  142. })
  143. function subscribeMultipleTopics (t, broker, qos, subscriber, subscriptions, done) {
  144. const publisher = connect(setup(broker))
  145. subscriber.inStream.write({
  146. cmd: 'subscribe',
  147. messageId: 24,
  148. subscriptions
  149. })
  150. subscriber.outStream.once('data', function (packet) {
  151. t.equal(packet.cmd, 'suback')
  152. t.same(packet.granted, subscriptions.map(obj => obj.qos))
  153. t.equal(packet.messageId, 24)
  154. publisher.inStream.write({
  155. cmd: 'publish',
  156. topic: 'hello/world',
  157. payload: 'world',
  158. qos,
  159. messageId: 42
  160. })
  161. if (done) {
  162. done(null, packet)
  163. }
  164. })
  165. }
  166. test('Overlapped topics with same QoS', function (t) {
  167. t.plan(4)
  168. const broker = aedes()
  169. t.teardown(broker.close.bind(broker))
  170. const subscriber = connect(setup(broker))
  171. const expected = {
  172. cmd: 'publish',
  173. topic: 'hello/world',
  174. payload: Buffer.from('world'),
  175. qos: 1,
  176. dup: false,
  177. length: 20,
  178. retain: false
  179. }
  180. const sub = [
  181. { topic: 'hello/world', qos: 1 },
  182. { topic: 'hello/#', qos: 1 }]
  183. subscribeMultipleTopics(t, broker, 1, subscriber, sub, function () {
  184. subscriber.outStream.on('data', function (packet) {
  185. delete packet.messageId
  186. t.same(packet, expected, 'packet must match')
  187. })
  188. })
  189. })
  190. // [MQTT-3.3.5-1]
  191. test('deliver overlapped topics respecting the maximum QoS of all the matching subscriptions - QoS 0 publish', function (t) {
  192. t.plan(4)
  193. const broker = aedes()
  194. t.teardown(broker.close.bind(broker))
  195. const subscriber = connect(setup(broker))
  196. const expected = {
  197. cmd: 'publish',
  198. topic: 'hello/world',
  199. payload: Buffer.from('world'),
  200. qos: 0,
  201. dup: false,
  202. length: 18,
  203. retain: false
  204. }
  205. const sub = [
  206. { topic: 'hello/world', qos: 0 },
  207. { topic: 'hello/#', qos: 2 }]
  208. subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () {
  209. subscriber.outStream.on('data', function (packet) {
  210. delete packet.messageId
  211. t.same(packet, expected, 'packet must match')
  212. })
  213. })
  214. })
  215. // [MQTT-3.3.5-1]
  216. test('deliver overlapped topics respecting the maximum QoS of all the matching subscriptions - QoS 2 publish', function (t) {
  217. t.plan(3)
  218. const broker = aedes()
  219. t.teardown(broker.close.bind(broker))
  220. const subscriber = connect(setup(broker))
  221. const sub = [
  222. { topic: 'hello/world', qos: 0 },
  223. { topic: 'hello/#', qos: 2 }]
  224. subscribeMultipleTopics(t, broker, 2, subscriber, sub, function () {
  225. subscriber.outStream.on('data', function () {
  226. t.fail('should receive messages with the maximum QoS')
  227. })
  228. })
  229. })
  230. test('Overlapped topics with QoS downgrade', function (t) {
  231. t.plan(4)
  232. const broker = aedes()
  233. t.teardown(broker.close.bind(broker))
  234. const subscriber = connect(setup(broker))
  235. const expected = {
  236. cmd: 'publish',
  237. topic: 'hello/world',
  238. payload: Buffer.from('world'),
  239. qos: 0,
  240. dup: false,
  241. length: 18,
  242. retain: false
  243. }
  244. const sub = [
  245. { topic: 'hello/world', qos: 1 },
  246. { topic: 'hello/#', qos: 1 }]
  247. subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () {
  248. subscriber.outStream.on('data', function (packet) {
  249. t.same(packet, expected, 'packet must match')
  250. })
  251. })
  252. })