writeToStream.js 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. const protocol = require('./constants')
  2. const { Buffer } = require('buffer')
  3. const empty = Buffer.allocUnsafe(0)
  4. const zeroBuf = Buffer.from([0])
  5. const numbers = require('./numbers')
  6. const nextTick = require('process-nextick-args').nextTick
  7. const debug = require('debug')('mqtt-packet:writeToStream')
  8. const numCache = numbers.cache
  9. const generateNumber = numbers.generateNumber
  10. const generateCache = numbers.generateCache
  11. const genBufVariableByteInt = numbers.genBufVariableByteInt
  12. const generate4ByteBuffer = numbers.generate4ByteBuffer
  13. let writeNumber = writeNumberCached
  14. let toGenerate = true
  15. function generate (packet, stream, opts) {
  16. debug('generate called')
  17. if (stream.cork) {
  18. stream.cork()
  19. nextTick(uncork, stream)
  20. }
  21. if (toGenerate) {
  22. toGenerate = false
  23. generateCache()
  24. }
  25. debug('generate: packet.cmd: %s', packet.cmd)
  26. switch (packet.cmd) {
  27. case 'connect':
  28. return connect(packet, stream, opts)
  29. case 'connack':
  30. return connack(packet, stream, opts)
  31. case 'publish':
  32. return publish(packet, stream, opts)
  33. case 'puback':
  34. case 'pubrec':
  35. case 'pubrel':
  36. case 'pubcomp':
  37. return confirmation(packet, stream, opts)
  38. case 'subscribe':
  39. return subscribe(packet, stream, opts)
  40. case 'suback':
  41. return suback(packet, stream, opts)
  42. case 'unsubscribe':
  43. return unsubscribe(packet, stream, opts)
  44. case 'unsuback':
  45. return unsuback(packet, stream, opts)
  46. case 'pingreq':
  47. case 'pingresp':
  48. return emptyPacket(packet, stream, opts)
  49. case 'disconnect':
  50. return disconnect(packet, stream, opts)
  51. case 'auth':
  52. return auth(packet, stream, opts)
  53. default:
  54. stream.destroy(new Error('Unknown command'))
  55. return false
  56. }
  57. }
  58. /**
  59. * Controls numbers cache.
  60. * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
  61. */
  62. Object.defineProperty(generate, 'cacheNumbers', {
  63. get () {
  64. return writeNumber === writeNumberCached
  65. },
  66. set (value) {
  67. if (value) {
  68. if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
  69. writeNumber = writeNumberCached
  70. } else {
  71. toGenerate = false
  72. writeNumber = writeNumberGenerated
  73. }
  74. }
  75. })
  76. function uncork (stream) {
  77. stream.uncork()
  78. }
  79. function connect (packet, stream, opts) {
  80. const settings = packet || {}
  81. const protocolId = settings.protocolId || 'MQTT'
  82. let protocolVersion = settings.protocolVersion || 4
  83. const will = settings.will
  84. let clean = settings.clean
  85. const keepalive = settings.keepalive || 0
  86. const clientId = settings.clientId || ''
  87. const username = settings.username
  88. const password = settings.password
  89. /* mqtt5 new oprions */
  90. const properties = settings.properties
  91. if (clean === undefined) clean = true
  92. let length = 0
  93. // Must be a string and non-falsy
  94. if (!protocolId ||
  95. (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
  96. stream.destroy(new Error('Invalid protocolId'))
  97. return false
  98. } else length += protocolId.length + 2
  99. // Must be 3 or 4 or 5
  100. if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
  101. stream.destroy(new Error('Invalid protocol version'))
  102. return false
  103. } else length += 1
  104. // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
  105. if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
  106. (clientId || protocolVersion >= 4) && (clientId || clean)) {
  107. length += Buffer.byteLength(clientId) + 2
  108. } else {
  109. if (protocolVersion < 4) {
  110. stream.destroy(new Error('clientId must be supplied before 3.1.1'))
  111. return false
  112. }
  113. if ((clean * 1) === 0) {
  114. stream.destroy(new Error('clientId must be given if cleanSession set to 0'))
  115. return false
  116. }
  117. }
  118. // Must be a two byte number
  119. if (typeof keepalive !== 'number' ||
  120. keepalive < 0 ||
  121. keepalive > 65535 ||
  122. keepalive % 1 !== 0) {
  123. stream.destroy(new Error('Invalid keepalive'))
  124. return false
  125. } else length += 2
  126. // Connect flags
  127. length += 1
  128. let propertiesData
  129. let willProperties
  130. // Properties
  131. if (protocolVersion === 5) {
  132. propertiesData = getProperties(stream, properties)
  133. if (!propertiesData) { return false }
  134. length += propertiesData.length
  135. }
  136. // If will exists...
  137. if (will) {
  138. // It must be an object
  139. if (typeof will !== 'object') {
  140. stream.destroy(new Error('Invalid will'))
  141. return false
  142. }
  143. // It must have topic typeof string
  144. if (!will.topic || typeof will.topic !== 'string') {
  145. stream.destroy(new Error('Invalid will topic'))
  146. return false
  147. } else {
  148. length += Buffer.byteLength(will.topic) + 2
  149. }
  150. // Payload
  151. length += 2 // payload length
  152. if (will.payload) {
  153. if (will.payload.length >= 0) {
  154. if (typeof will.payload === 'string') {
  155. length += Buffer.byteLength(will.payload)
  156. } else {
  157. length += will.payload.length
  158. }
  159. } else {
  160. stream.destroy(new Error('Invalid will payload'))
  161. return false
  162. }
  163. }
  164. // will properties
  165. willProperties = {}
  166. if (protocolVersion === 5) {
  167. willProperties = getProperties(stream, will.properties)
  168. if (!willProperties) { return false }
  169. length += willProperties.length
  170. }
  171. }
  172. // Username
  173. let providedUsername = false
  174. if (username != null) {
  175. if (isStringOrBuffer(username)) {
  176. providedUsername = true
  177. length += Buffer.byteLength(username) + 2
  178. } else {
  179. stream.destroy(new Error('Invalid username'))
  180. return false
  181. }
  182. }
  183. // Password
  184. if (password != null) {
  185. if (!providedUsername) {
  186. stream.destroy(new Error('Username is required to use password'))
  187. return false
  188. }
  189. if (isStringOrBuffer(password)) {
  190. length += byteLength(password) + 2
  191. } else {
  192. stream.destroy(new Error('Invalid password'))
  193. return false
  194. }
  195. }
  196. // Generate header
  197. stream.write(protocol.CONNECT_HEADER)
  198. // Generate length
  199. writeVarByteInt(stream, length)
  200. // Generate protocol ID
  201. writeStringOrBuffer(stream, protocolId)
  202. if (settings.bridgeMode) {
  203. protocolVersion += 128
  204. }
  205. stream.write(
  206. protocolVersion === 131
  207. ? protocol.VERSION131
  208. : protocolVersion === 132
  209. ? protocol.VERSION132
  210. : protocolVersion === 4
  211. ? protocol.VERSION4
  212. : protocolVersion === 5
  213. ? protocol.VERSION5
  214. : protocol.VERSION3
  215. )
  216. // Connect flags
  217. let flags = 0
  218. flags |= (username != null) ? protocol.USERNAME_MASK : 0
  219. flags |= (password != null) ? protocol.PASSWORD_MASK : 0
  220. flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
  221. flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
  222. flags |= will ? protocol.WILL_FLAG_MASK : 0
  223. flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
  224. stream.write(Buffer.from([flags]))
  225. // Keepalive
  226. writeNumber(stream, keepalive)
  227. // Properties
  228. if (protocolVersion === 5) {
  229. propertiesData.write()
  230. }
  231. // Client ID
  232. writeStringOrBuffer(stream, clientId)
  233. // Will
  234. if (will) {
  235. if (protocolVersion === 5) {
  236. willProperties.write()
  237. }
  238. writeString(stream, will.topic)
  239. writeStringOrBuffer(stream, will.payload)
  240. }
  241. // Username and password
  242. if (username != null) {
  243. writeStringOrBuffer(stream, username)
  244. }
  245. if (password != null) {
  246. writeStringOrBuffer(stream, password)
  247. }
  248. // This is a small packet that happens only once on a stream
  249. // We assume the stream is always free to receive more data after this
  250. return true
  251. }
  252. function connack (packet, stream, opts) {
  253. const version = opts ? opts.protocolVersion : 4
  254. const settings = packet || {}
  255. const rc = version === 5 ? settings.reasonCode : settings.returnCode
  256. const properties = settings.properties
  257. let length = 2 // length of rc and sessionHeader
  258. // Check return code
  259. if (typeof rc !== 'number') {
  260. stream.destroy(new Error('Invalid return code'))
  261. return false
  262. }
  263. // mqtt5 properties
  264. let propertiesData = null
  265. if (version === 5) {
  266. propertiesData = getProperties(stream, properties)
  267. if (!propertiesData) { return false }
  268. length += propertiesData.length
  269. }
  270. stream.write(protocol.CONNACK_HEADER)
  271. // length
  272. writeVarByteInt(stream, length)
  273. stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
  274. stream.write(Buffer.from([rc]))
  275. if (propertiesData != null) {
  276. propertiesData.write()
  277. }
  278. return true
  279. }
  280. function publish (packet, stream, opts) {
  281. debug('publish: packet: %o', packet)
  282. const version = opts ? opts.protocolVersion : 4
  283. const settings = packet || {}
  284. const qos = settings.qos || 0
  285. const retain = settings.retain ? protocol.RETAIN_MASK : 0
  286. const topic = settings.topic
  287. const payload = settings.payload || empty
  288. const id = settings.messageId
  289. const properties = settings.properties
  290. let length = 0
  291. // Topic must be a non-empty string or Buffer
  292. if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
  293. else if (Buffer.isBuffer(topic)) length += topic.length + 2
  294. else {
  295. stream.destroy(new Error('Invalid topic'))
  296. return false
  297. }
  298. // Get the payload length
  299. if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
  300. else length += payload.length
  301. // Message ID must a number if qos > 0
  302. if (qos && typeof id !== 'number') {
  303. stream.destroy(new Error('Invalid messageId'))
  304. return false
  305. } else if (qos) length += 2
  306. // mqtt5 properties
  307. let propertiesData = null
  308. if (version === 5) {
  309. propertiesData = getProperties(stream, properties)
  310. if (!propertiesData) { return false }
  311. length += propertiesData.length
  312. }
  313. // Header
  314. stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])
  315. // Remaining length
  316. writeVarByteInt(stream, length)
  317. // Topic
  318. writeNumber(stream, byteLength(topic))
  319. stream.write(topic)
  320. // Message ID
  321. if (qos > 0) writeNumber(stream, id)
  322. // Properties
  323. if (propertiesData != null) {
  324. propertiesData.write()
  325. }
  326. // Payload
  327. debug('publish: payload: %o', payload)
  328. return stream.write(payload)
  329. }
  330. /* Puback, pubrec, pubrel and pubcomp */
  331. function confirmation (packet, stream, opts) {
  332. const version = opts ? opts.protocolVersion : 4
  333. const settings = packet || {}
  334. const type = settings.cmd || 'puback'
  335. const id = settings.messageId
  336. const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
  337. let qos = 0
  338. const reasonCode = settings.reasonCode
  339. const properties = settings.properties
  340. let length = version === 5 ? 3 : 2
  341. if (type === 'pubrel') qos = 1
  342. // Check message ID
  343. if (typeof id !== 'number') {
  344. stream.destroy(new Error('Invalid messageId'))
  345. return false
  346. }
  347. // properies mqtt 5
  348. let propertiesData = null
  349. if (version === 5) {
  350. // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)
  351. if (typeof properties === 'object') {
  352. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  353. if (!propertiesData) { return false }
  354. length += propertiesData.length
  355. }
  356. }
  357. // Header
  358. stream.write(protocol.ACKS[type][qos][dup][0])
  359. // Length === 3 is only true of version === 5 and no properties; therefore if reasonCode === 0 we are allowed to skip both bytes - but if we write the reason code we also have to write property length [MQTT-3.4.2-1].
  360. if (length === 3) length += reasonCode !== 0 ? 1 : -1
  361. writeVarByteInt(stream, length)
  362. // Message ID
  363. writeNumber(stream, id)
  364. // reason code in header - but only if it couldn't be omitted - indicated by length !== 2.
  365. if (version === 5 && length !== 2) {
  366. stream.write(Buffer.from([reasonCode]))
  367. }
  368. // properties mqtt 5
  369. if (propertiesData !== null) {
  370. propertiesData.write()
  371. } else {
  372. if (length === 4) {
  373. // we have no properties but have written a reason code - so we need to indicate empty properties by filling in a zero.
  374. stream.write(Buffer.from([0]))
  375. }
  376. }
  377. return true
  378. }
  379. function subscribe (packet, stream, opts) {
  380. debug('subscribe: packet: ')
  381. const version = opts ? opts.protocolVersion : 4
  382. const settings = packet || {}
  383. const dup = settings.dup ? protocol.DUP_MASK : 0
  384. const id = settings.messageId
  385. const subs = settings.subscriptions
  386. const properties = settings.properties
  387. let length = 0
  388. // Check message ID
  389. if (typeof id !== 'number') {
  390. stream.destroy(new Error('Invalid messageId'))
  391. return false
  392. } else length += 2
  393. // properies mqtt 5
  394. let propertiesData = null
  395. if (version === 5) {
  396. propertiesData = getProperties(stream, properties)
  397. if (!propertiesData) { return false }
  398. length += propertiesData.length
  399. }
  400. // Check subscriptions
  401. if (typeof subs === 'object' && subs.length) {
  402. for (let i = 0; i < subs.length; i += 1) {
  403. const itopic = subs[i].topic
  404. const iqos = subs[i].qos
  405. if (typeof itopic !== 'string') {
  406. stream.destroy(new Error('Invalid subscriptions - invalid topic'))
  407. return false
  408. }
  409. if (typeof iqos !== 'number') {
  410. stream.destroy(new Error('Invalid subscriptions - invalid qos'))
  411. return false
  412. }
  413. if (version === 5) {
  414. const nl = subs[i].nl || false
  415. if (typeof nl !== 'boolean') {
  416. stream.destroy(new Error('Invalid subscriptions - invalid No Local'))
  417. return false
  418. }
  419. const rap = subs[i].rap || false
  420. if (typeof rap !== 'boolean') {
  421. stream.destroy(new Error('Invalid subscriptions - invalid Retain as Published'))
  422. return false
  423. }
  424. const rh = subs[i].rh || 0
  425. if (typeof rh !== 'number' || rh > 2) {
  426. stream.destroy(new Error('Invalid subscriptions - invalid Retain Handling'))
  427. return false
  428. }
  429. }
  430. length += Buffer.byteLength(itopic) + 2 + 1
  431. }
  432. } else {
  433. stream.destroy(new Error('Invalid subscriptions'))
  434. return false
  435. }
  436. // Generate header
  437. debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)
  438. stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  439. // Generate length
  440. writeVarByteInt(stream, length)
  441. // Generate message ID
  442. writeNumber(stream, id)
  443. // properies mqtt 5
  444. if (propertiesData !== null) {
  445. propertiesData.write()
  446. }
  447. let result = true
  448. // Generate subs
  449. for (const sub of subs) {
  450. const jtopic = sub.topic
  451. const jqos = sub.qos
  452. const jnl = +sub.nl
  453. const jrap = +sub.rap
  454. const jrh = sub.rh
  455. let joptions
  456. // Write topic string
  457. writeString(stream, jtopic)
  458. // options process
  459. joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]
  460. if (version === 5) {
  461. joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0
  462. joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0
  463. joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0
  464. }
  465. // Write options
  466. result = stream.write(Buffer.from([joptions]))
  467. }
  468. return result
  469. }
  470. function suback (packet, stream, opts) {
  471. const version = opts ? opts.protocolVersion : 4
  472. const settings = packet || {}
  473. const id = settings.messageId
  474. const granted = settings.granted
  475. const properties = settings.properties
  476. let length = 0
  477. // Check message ID
  478. if (typeof id !== 'number') {
  479. stream.destroy(new Error('Invalid messageId'))
  480. return false
  481. } else length += 2
  482. // Check granted qos vector
  483. if (typeof granted === 'object' && granted.length) {
  484. for (let i = 0; i < granted.length; i += 1) {
  485. if (typeof granted[i] !== 'number') {
  486. stream.destroy(new Error('Invalid qos vector'))
  487. return false
  488. }
  489. length += 1
  490. }
  491. } else {
  492. stream.destroy(new Error('Invalid qos vector'))
  493. return false
  494. }
  495. // properies mqtt 5
  496. let propertiesData = null
  497. if (version === 5) {
  498. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  499. if (!propertiesData) { return false }
  500. length += propertiesData.length
  501. }
  502. // header
  503. stream.write(protocol.SUBACK_HEADER)
  504. // Length
  505. writeVarByteInt(stream, length)
  506. // Message ID
  507. writeNumber(stream, id)
  508. // properies mqtt 5
  509. if (propertiesData !== null) {
  510. propertiesData.write()
  511. }
  512. return stream.write(Buffer.from(granted))
  513. }
  514. function unsubscribe (packet, stream, opts) {
  515. const version = opts ? opts.protocolVersion : 4
  516. const settings = packet || {}
  517. const id = settings.messageId
  518. const dup = settings.dup ? protocol.DUP_MASK : 0
  519. const unsubs = settings.unsubscriptions
  520. const properties = settings.properties
  521. let length = 0
  522. // Check message ID
  523. if (typeof id !== 'number') {
  524. stream.destroy(new Error('Invalid messageId'))
  525. return false
  526. } else {
  527. length += 2
  528. }
  529. // Check unsubs
  530. if (typeof unsubs === 'object' && unsubs.length) {
  531. for (let i = 0; i < unsubs.length; i += 1) {
  532. if (typeof unsubs[i] !== 'string') {
  533. stream.destroy(new Error('Invalid unsubscriptions'))
  534. return false
  535. }
  536. length += Buffer.byteLength(unsubs[i]) + 2
  537. }
  538. } else {
  539. stream.destroy(new Error('Invalid unsubscriptions'))
  540. return false
  541. }
  542. // properies mqtt 5
  543. let propertiesData = null
  544. if (version === 5) {
  545. propertiesData = getProperties(stream, properties)
  546. if (!propertiesData) { return false }
  547. length += propertiesData.length
  548. }
  549. // Header
  550. stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  551. // Length
  552. writeVarByteInt(stream, length)
  553. // Message ID
  554. writeNumber(stream, id)
  555. // properies mqtt 5
  556. if (propertiesData !== null) {
  557. propertiesData.write()
  558. }
  559. // Unsubs
  560. let result = true
  561. for (let j = 0; j < unsubs.length; j++) {
  562. result = writeString(stream, unsubs[j])
  563. }
  564. return result
  565. }
  566. function unsuback (packet, stream, opts) {
  567. const version = opts ? opts.protocolVersion : 4
  568. const settings = packet || {}
  569. const id = settings.messageId
  570. const dup = settings.dup ? protocol.DUP_MASK : 0
  571. const granted = settings.granted
  572. const properties = settings.properties
  573. const type = settings.cmd
  574. const qos = 0
  575. let length = 2
  576. // Check message ID
  577. if (typeof id !== 'number') {
  578. stream.destroy(new Error('Invalid messageId'))
  579. return false
  580. }
  581. // Check granted
  582. if (version === 5) {
  583. if (typeof granted === 'object' && granted.length) {
  584. for (let i = 0; i < granted.length; i += 1) {
  585. if (typeof granted[i] !== 'number') {
  586. stream.destroy(new Error('Invalid qos vector'))
  587. return false
  588. }
  589. length += 1
  590. }
  591. } else {
  592. stream.destroy(new Error('Invalid qos vector'))
  593. return false
  594. }
  595. }
  596. // properies mqtt 5
  597. let propertiesData = null
  598. if (version === 5) {
  599. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  600. if (!propertiesData) { return false }
  601. length += propertiesData.length
  602. }
  603. // Header
  604. stream.write(protocol.ACKS[type][qos][dup][0])
  605. // Length
  606. writeVarByteInt(stream, length)
  607. // Message ID
  608. writeNumber(stream, id)
  609. // properies mqtt 5
  610. if (propertiesData !== null) {
  611. propertiesData.write()
  612. }
  613. // payload
  614. if (version === 5) {
  615. stream.write(Buffer.from(granted))
  616. }
  617. return true
  618. }
  619. function emptyPacket (packet, stream, opts) {
  620. return stream.write(protocol.EMPTY[packet.cmd])
  621. }
  622. function disconnect (packet, stream, opts) {
  623. const version = opts ? opts.protocolVersion : 4
  624. const settings = packet || {}
  625. const reasonCode = settings.reasonCode
  626. const properties = settings.properties
  627. let length = version === 5 ? 1 : 0
  628. // properies mqtt 5
  629. let propertiesData = null
  630. if (version === 5) {
  631. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  632. if (!propertiesData) { return false }
  633. length += propertiesData.length
  634. }
  635. // Header
  636. stream.write(Buffer.from([protocol.codes.disconnect << 4]))
  637. // Length
  638. writeVarByteInt(stream, length)
  639. // reason code in header
  640. if (version === 5) {
  641. stream.write(Buffer.from([reasonCode]))
  642. }
  643. // properies mqtt 5
  644. if (propertiesData !== null) {
  645. propertiesData.write()
  646. }
  647. return true
  648. }
  649. function auth (packet, stream, opts) {
  650. const version = opts ? opts.protocolVersion : 4
  651. const settings = packet || {}
  652. const reasonCode = settings.reasonCode
  653. const properties = settings.properties
  654. let length = version === 5 ? 1 : 0
  655. if (version !== 5) stream.destroy(new Error('Invalid mqtt version for auth packet'))
  656. // properies mqtt 5
  657. const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  658. if (!propertiesData) { return false }
  659. length += propertiesData.length
  660. // Header
  661. stream.write(Buffer.from([protocol.codes.auth << 4]))
  662. // Length
  663. writeVarByteInt(stream, length)
  664. // reason code in header
  665. stream.write(Buffer.from([reasonCode]))
  666. // properies mqtt 5
  667. if (propertiesData !== null) {
  668. propertiesData.write()
  669. }
  670. return true
  671. }
  672. /**
  673. * writeVarByteInt - write an MQTT style variable byte integer to the buffer
  674. *
  675. * @param <Buffer> buffer - destination
  676. * @param <Number> pos - offset
  677. * @param <Number> length - length (>0)
  678. * @returns <Number> number of bytes written
  679. *
  680. * @api private
  681. */
  682. const varByteIntCache = {}
  683. function writeVarByteInt (stream, num) {
  684. if (num > protocol.VARBYTEINT_MAX) {
  685. stream.destroy(new Error(`Invalid variable byte integer: ${num}`))
  686. return false
  687. }
  688. let buffer = varByteIntCache[num]
  689. if (!buffer) {
  690. buffer = genBufVariableByteInt(num)
  691. if (num < 16384) varByteIntCache[num] = buffer
  692. }
  693. debug('writeVarByteInt: writing to stream: %o', buffer)
  694. return stream.write(buffer)
  695. }
  696. /**
  697. * writeString - write a utf8 string to the buffer
  698. *
  699. * @param <Buffer> buffer - destination
  700. * @param <Number> pos - offset
  701. * @param <String> string - string to write
  702. * @return <Number> number of bytes written
  703. *
  704. * @api private
  705. */
  706. function writeString (stream, string) {
  707. const strlen = Buffer.byteLength(string)
  708. writeNumber(stream, strlen)
  709. debug('writeString: %s', string)
  710. return stream.write(string, 'utf8')
  711. }
  712. /**
  713. * writeStringPair - write a utf8 string pairs to the buffer
  714. *
  715. * @param <Buffer> buffer - destination
  716. * @param <String> name - string name to write
  717. * @param <String> value - string value to write
  718. * @return <Number> number of bytes written
  719. *
  720. * @api private
  721. */
  722. function writeStringPair (stream, name, value) {
  723. writeString(stream, name)
  724. writeString(stream, value)
  725. }
  726. /**
  727. * writeNumber - write a two byte number to the buffer
  728. *
  729. * @param <Buffer> buffer - destination
  730. * @param <Number> pos - offset
  731. * @param <String> number - number to write
  732. * @return <Number> number of bytes written
  733. *
  734. * @api private
  735. */
  736. function writeNumberCached (stream, number) {
  737. debug('writeNumberCached: number: %d', number)
  738. debug('writeNumberCached: %o', numCache[number])
  739. return stream.write(numCache[number])
  740. }
  741. function writeNumberGenerated (stream, number) {
  742. const generatedNumber = generateNumber(number)
  743. debug('writeNumberGenerated: %o', generatedNumber)
  744. return stream.write(generatedNumber)
  745. }
  746. function write4ByteNumber (stream, number) {
  747. const generated4ByteBuffer = generate4ByteBuffer(number)
  748. debug('write4ByteNumber: %o', generated4ByteBuffer)
  749. return stream.write(generated4ByteBuffer)
  750. }
  751. /**
  752. * writeStringOrBuffer - write a String or Buffer with the its length prefix
  753. *
  754. * @param <Buffer> buffer - destination
  755. * @param <Number> pos - offset
  756. * @param <String> toWrite - String or Buffer
  757. * @return <Number> number of bytes written
  758. */
  759. function writeStringOrBuffer (stream, toWrite) {
  760. if (typeof toWrite === 'string') {
  761. writeString(stream, toWrite)
  762. } else if (toWrite) {
  763. writeNumber(stream, toWrite.length)
  764. stream.write(toWrite)
  765. } else writeNumber(stream, 0)
  766. }
  767. function getProperties (stream, properties) {
  768. /* connect properties */
  769. if (typeof properties !== 'object' || properties.length != null) {
  770. return {
  771. length: 1,
  772. write () {
  773. writeProperties(stream, {}, 0)
  774. }
  775. }
  776. }
  777. let propertiesLength = 0
  778. function getLengthProperty (name, value) {
  779. const type = protocol.propertiesTypes[name]
  780. let length = 0
  781. switch (type) {
  782. case 'byte': {
  783. if (typeof value !== 'boolean') {
  784. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  785. return false
  786. }
  787. length += 1 + 1
  788. break
  789. }
  790. case 'int8': {
  791. if (typeof value !== 'number' || value < 0 || value > 0xff) {
  792. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  793. return false
  794. }
  795. length += 1 + 1
  796. break
  797. }
  798. case 'binary': {
  799. if (value && value === null) {
  800. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  801. return false
  802. }
  803. length += 1 + Buffer.byteLength(value) + 2
  804. break
  805. }
  806. case 'int16': {
  807. if (typeof value !== 'number' || value < 0 || value > 0xffff) {
  808. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  809. return false
  810. }
  811. length += 1 + 2
  812. break
  813. }
  814. case 'int32': {
  815. if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
  816. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  817. return false
  818. }
  819. length += 1 + 4
  820. break
  821. }
  822. case 'var': {
  823. // var byte integer is max 24 bits packed in 32 bits
  824. if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
  825. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  826. return false
  827. }
  828. length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
  829. break
  830. }
  831. case 'string': {
  832. if (typeof value !== 'string') {
  833. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  834. return false
  835. }
  836. length += 1 + 2 + Buffer.byteLength(value.toString())
  837. break
  838. }
  839. case 'pair': {
  840. if (typeof value !== 'object') {
  841. stream.destroy(new Error(`Invalid ${name}: ${value}`))
  842. return false
  843. }
  844. length += Object.getOwnPropertyNames(value).reduce((result, name) => {
  845. const currentValue = value[name]
  846. if (Array.isArray(currentValue)) {
  847. result += currentValue.reduce((currentLength, value) => {
  848. currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())
  849. return currentLength
  850. }, 0)
  851. } else {
  852. result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())
  853. }
  854. return result
  855. }, 0)
  856. break
  857. }
  858. default: {
  859. stream.destroy(new Error(`Invalid property ${name}: ${value}`))
  860. return false
  861. }
  862. }
  863. return length
  864. }
  865. if (properties) {
  866. for (const propName in properties) {
  867. let propLength = 0
  868. let propValueLength = 0
  869. const propValue = properties[propName]
  870. if (propValue === undefined) {
  871. continue
  872. } else if (Array.isArray(propValue)) {
  873. for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
  874. propValueLength = getLengthProperty(propName, propValue[valueIndex])
  875. if (!propValueLength) { return false }
  876. propLength += propValueLength
  877. }
  878. } else {
  879. propValueLength = getLengthProperty(propName, propValue)
  880. if (!propValueLength) { return false }
  881. propLength = propValueLength
  882. }
  883. if (!propLength) return false
  884. propertiesLength += propLength
  885. }
  886. }
  887. const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))
  888. return {
  889. length: propertiesLengthLength + propertiesLength,
  890. write () {
  891. writeProperties(stream, properties, propertiesLength)
  892. }
  893. }
  894. }
  895. function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
  896. const mayEmptyProps = ['reasonString', 'userProperties']
  897. const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0
  898. let propertiesData = getProperties(stream, properties)
  899. if (maximumPacketSize) {
  900. while (length + propertiesData.length > maximumPacketSize) {
  901. const currentMayEmptyProp = mayEmptyProps.shift()
  902. if (currentMayEmptyProp && properties[currentMayEmptyProp]) {
  903. delete properties[currentMayEmptyProp]
  904. propertiesData = getProperties(stream, properties)
  905. } else {
  906. return false
  907. }
  908. }
  909. }
  910. return propertiesData
  911. }
  912. function writeProperty (stream, propName, value) {
  913. const type = protocol.propertiesTypes[propName]
  914. switch (type) {
  915. case 'byte': {
  916. stream.write(Buffer.from([protocol.properties[propName]]))
  917. stream.write(Buffer.from([+value]))
  918. break
  919. }
  920. case 'int8': {
  921. stream.write(Buffer.from([protocol.properties[propName]]))
  922. stream.write(Buffer.from([value]))
  923. break
  924. }
  925. case 'binary': {
  926. stream.write(Buffer.from([protocol.properties[propName]]))
  927. writeStringOrBuffer(stream, value)
  928. break
  929. }
  930. case 'int16': {
  931. stream.write(Buffer.from([protocol.properties[propName]]))
  932. writeNumber(stream, value)
  933. break
  934. }
  935. case 'int32': {
  936. stream.write(Buffer.from([protocol.properties[propName]]))
  937. write4ByteNumber(stream, value)
  938. break
  939. }
  940. case 'var': {
  941. stream.write(Buffer.from([protocol.properties[propName]]))
  942. writeVarByteInt(stream, value)
  943. break
  944. }
  945. case 'string': {
  946. stream.write(Buffer.from([protocol.properties[propName]]))
  947. writeString(stream, value)
  948. break
  949. }
  950. case 'pair': {
  951. Object.getOwnPropertyNames(value).forEach(name => {
  952. const currentValue = value[name]
  953. if (Array.isArray(currentValue)) {
  954. currentValue.forEach(value => {
  955. stream.write(Buffer.from([protocol.properties[propName]]))
  956. writeStringPair(stream, name.toString(), value.toString())
  957. })
  958. } else {
  959. stream.write(Buffer.from([protocol.properties[propName]]))
  960. writeStringPair(stream, name.toString(), currentValue.toString())
  961. }
  962. })
  963. break
  964. }
  965. default: {
  966. stream.destroy(new Error(`Invalid property ${propName} value: ${value}`))
  967. return false
  968. }
  969. }
  970. }
  971. function writeProperties (stream, properties, propertiesLength) {
  972. /* write properties to stream */
  973. writeVarByteInt(stream, propertiesLength)
  974. for (const propName in properties) {
  975. if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] != null) {
  976. const value = properties[propName]
  977. if (Array.isArray(value)) {
  978. for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {
  979. writeProperty(stream, propName, value[valueIndex])
  980. }
  981. } else {
  982. writeProperty(stream, propName, value)
  983. }
  984. }
  985. }
  986. }
  987. function byteLength (bufOrString) {
  988. if (!bufOrString) return 0
  989. else if (bufOrString instanceof Buffer) return bufOrString.length
  990. else return Buffer.byteLength(bufOrString)
  991. }
  992. function isStringOrBuffer (field) {
  993. return typeof field === 'string' || field instanceof Buffer
  994. }
  995. module.exports = generate