mqemitter.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. 'use strict'
  2. const { Qlobber } = require('qlobber')
  3. const assert = require('assert')
  4. const fastparallel = require('fastparallel')
  5. function MQEmitter (opts) {
  6. if (!(this instanceof MQEmitter)) {
  7. return new MQEmitter(opts)
  8. }
  9. const that = this
  10. opts = opts || {}
  11. opts.matchEmptyLevels = opts.matchEmptyLevels === undefined ? true : !!opts.matchEmptyLevels
  12. opts.separator = opts.separator || '/'
  13. opts.wildcardOne = opts.wildcardOne || '+'
  14. opts.wildcardSome = opts.wildcardSome || '#'
  15. this._messageQueue = []
  16. this._messageCallbacks = []
  17. this._parallel = fastparallel({
  18. results: false,
  19. released
  20. })
  21. this.concurrency = opts.concurrency || 0
  22. this.current = 0
  23. this._doing = false
  24. this._matcher = new Qlobber({
  25. match_empty_levels: opts.matchEmptyLevels,
  26. separator: opts.separator,
  27. wildcard_one: opts.wildcardOne,
  28. wildcard_some: opts.wildcardSome
  29. })
  30. this.closed = false
  31. this._released = released
  32. function released () {
  33. that.current--
  34. const message = that._messageQueue.shift()
  35. const callback = that._messageCallbacks.shift()
  36. if (message) {
  37. that._do(message, callback)
  38. } else {
  39. that._doing = false
  40. }
  41. }
  42. }
  43. Object.defineProperty(MQEmitter.prototype, 'length', {
  44. get: function () {
  45. return this._messageQueue.length
  46. },
  47. enumerable: true
  48. })
  49. MQEmitter.prototype.on = function on (topic, notify, done) {
  50. assert(topic)
  51. assert(notify)
  52. this._matcher.add(topic, notify)
  53. if (done) {
  54. setImmediate(done)
  55. }
  56. return this
  57. }
  58. MQEmitter.prototype.removeListener = function removeListener (topic, notify, done) {
  59. assert(topic)
  60. assert(notify)
  61. const that = this
  62. setImmediate(function () {
  63. that._matcher.remove(topic, notify)
  64. if (done) {
  65. done()
  66. }
  67. })
  68. return this
  69. }
  70. MQEmitter.prototype.removeAllListeners = function removeListener (topic, done) {
  71. assert(topic)
  72. this._matcher.remove(topic)
  73. if (done) {
  74. setImmediate(done)
  75. }
  76. return this
  77. }
  78. MQEmitter.prototype.emit = function emit (message, cb) {
  79. assert(message)
  80. cb = cb || noop
  81. if (this.closed) {
  82. return cb(new Error('mqemitter is closed'))
  83. }
  84. if (this.concurrency > 0 && this.current >= this.concurrency) {
  85. this._messageQueue.push(message)
  86. this._messageCallbacks.push(cb)
  87. if (!this._doing) {
  88. process.emitWarning('MqEmitter leak detected', { detail: 'For more info check: https://github.com/mcollina/mqemitter/pull/94' })
  89. this._released()
  90. }
  91. } else {
  92. this._do(message, cb)
  93. }
  94. return this
  95. }
  96. MQEmitter.prototype.close = function close (cb) {
  97. this.closed = true
  98. setImmediate(cb)
  99. return this
  100. }
  101. MQEmitter.prototype._do = function (message, callback) {
  102. this._doing = true
  103. const matches = this._matcher.match(message.topic)
  104. this.current++
  105. this._parallel(this, matches, message, callback)
  106. return this
  107. }
  108. function noop () { }
  109. module.exports = MQEmitter