utils.js 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. 'use strict'
  2. const { Transform, Writable } = require('stream')
  3. function validateTopic (topic, message) {
  4. if (!topic || topic.length === 0) { // [MQTT-3.8.3-3]
  5. return new Error('impossible to ' + message + ' to an empty topic')
  6. }
  7. const end = topic.length - 1
  8. const endMinus = end - 1
  9. const slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47
  10. for (let i = 0; i < topic.length; i++) {
  11. switch (topic.charCodeAt(i)) {
  12. case 35: { // #
  13. const notAtTheEnd = i !== end
  14. if (notAtTheEnd || slashInPreEnd) {
  15. return new Error('# is only allowed in ' + message + ' in the last position')
  16. }
  17. break
  18. }
  19. case 43: { // +
  20. const pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47
  21. const preChar = i > 1 && topic.charCodeAt(i - 1) !== 47
  22. if (pastChar || preChar) {
  23. return new Error('+ is only allowed in ' + message + ' between /')
  24. }
  25. break
  26. }
  27. }
  28. }
  29. }
  30. function through (transform) {
  31. return new Transform({
  32. objectMode: true,
  33. transform
  34. })
  35. }
  36. function bulk (fn) {
  37. return new Writable({
  38. objectMode: true,
  39. writev: function (chunks, cb) {
  40. fn(chunks.map(chunk => chunk.chunk), cb)
  41. }
  42. })
  43. }
  44. module.exports = {
  45. validateTopic,
  46. through,
  47. bulk,
  48. $SYS_PREFIX: '$SYS/'
  49. }