helper.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. 'use strict'
  2. const duplexify = require('duplexify')
  3. const mqtt = require('mqtt-connection')
  4. const { through } = require('../lib/utils')
  5. const util = require('util')
  6. const aedes = require('../')
  7. const parseStream = mqtt.parseStream
  8. const generateStream = mqtt.generateStream
  9. let clients = 0
  10. function setup (broker) {
  11. const inStream = generateStream()
  12. const outStream = parseStream()
  13. const conn = duplexify(outStream, inStream)
  14. broker = broker || aedes()
  15. return {
  16. client: broker.handle(conn),
  17. conn,
  18. inStream,
  19. outStream,
  20. broker
  21. }
  22. }
  23. function connect (s, opts, connected) {
  24. s = Object.create(s)
  25. s.outStream = s.outStream.pipe(through(filter))
  26. opts = opts || {}
  27. opts.cmd = 'connect'
  28. opts.protocolId = opts.protocolId || 'MQTT'
  29. opts.protocolVersion = opts.protocolVersion || 4
  30. opts.clean = !!opts.clean
  31. opts.clientId = opts.clientId || 'my-client-' + clients++
  32. opts.keepalive = opts.keepalive || 0
  33. s.inStream.write(opts)
  34. return s
  35. function filter (packet, enc, cb) {
  36. if (packet.cmd !== 'publish') {
  37. delete packet.topic
  38. delete packet.payload
  39. }
  40. // using setImmediate to wait for connected to be fired
  41. // setup also needs to return first
  42. if (packet.cmd !== 'connack') {
  43. setImmediate(this.push.bind(this, packet))
  44. } else if (connected && packet.returnCode === 0) {
  45. setImmediate(connected, packet)
  46. }
  47. cb()
  48. }
  49. }
  50. function noError (s, t) {
  51. s.broker.on('clientError', function (client, err) {
  52. if (err) throw err
  53. t.notOk(err, 'must not error')
  54. })
  55. return s
  56. }
  57. function subscribe (t, subscriber, topic, qos, done) {
  58. subscriber.inStream.write({
  59. cmd: 'subscribe',
  60. messageId: 24,
  61. subscriptions: [{
  62. topic,
  63. qos
  64. }]
  65. })
  66. subscriber.outStream.once('data', function (packet) {
  67. t.equal(packet.cmd, 'suback')
  68. t.same(packet.granted, [qos])
  69. t.equal(packet.messageId, 24)
  70. if (done) {
  71. done(null, packet)
  72. }
  73. })
  74. }
  75. // subs: [{topic:, qos:}]
  76. function subscribeMultiple (t, subscriber, subs, expectedGranted, done) {
  77. subscriber.inStream.write({
  78. cmd: 'subscribe',
  79. messageId: 24,
  80. subscriptions: subs
  81. })
  82. subscriber.outStream.once('data', function (packet) {
  83. t.equal(packet.cmd, 'suback')
  84. t.same(packet.granted, expectedGranted)
  85. t.equal(packet.messageId, 24)
  86. if (done) {
  87. done(null, packet)
  88. }
  89. })
  90. }
  91. module.exports = {
  92. setup,
  93. connect,
  94. noError,
  95. subscribe,
  96. subscribeMultiple,
  97. delay: util.promisify(setTimeout)
  98. }