connect.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. 'use strict'
  2. const { test } = require('tap')
  3. const http = require('http')
  4. const ws = require('websocket-stream')
  5. const mqtt = require('mqtt')
  6. const { setup, connect, delay } = require('./helper')
  7. const aedes = require('../')
  8. ;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) {
  9. test('connect and connack (minimal)', function (t) {
  10. t.plan(2)
  11. const s = setup()
  12. t.teardown(s.broker.close.bind(s.broker))
  13. s.inStream.write({
  14. cmd: 'connect',
  15. protocolId: ele.id,
  16. protocolVersion: ele.ver,
  17. clean: true,
  18. clientId: 'my-client',
  19. keepalive: 0
  20. })
  21. s.outStream.on('data', function (packet) {
  22. t.same(packet, {
  23. cmd: 'connack',
  24. returnCode: 0,
  25. length: 2,
  26. qos: 0,
  27. retain: false,
  28. dup: false,
  29. topic: null,
  30. payload: null,
  31. sessionPresent: false
  32. }, 'successful connack')
  33. t.equal(s.client.version, ele.ver)
  34. })
  35. })
  36. })
  37. // [MQTT-3.1.2-2]
  38. test('reject client requested for unacceptable protocol version', function (t) {
  39. t.plan(4)
  40. const broker = aedes()
  41. t.teardown(broker.close.bind(broker))
  42. const s = setup(broker)
  43. s.inStream.write({
  44. cmd: 'connect',
  45. protocolId: 'MQIsdp',
  46. protocolVersion: 5,
  47. clean: true,
  48. clientId: 'my-client',
  49. keepalive: 0
  50. })
  51. s.outStream.on('data', function (packet) {
  52. t.equal(packet.cmd, 'connack')
  53. t.equal(packet.returnCode, 1, 'unacceptable protocol version')
  54. t.equal(broker.connectedClients, 0)
  55. })
  56. broker.on('clientError', function (client, err) {
  57. t.fail('should not raise clientError error')
  58. })
  59. broker.on('connectionError', function (client, err) {
  60. t.equal(err.message, 'unacceptable protocol version')
  61. })
  62. })
  63. // [MQTT-3.1.2-1], Guarded in mqtt-packet
  64. test('reject client requested for unsupported protocol version', function (t) {
  65. t.plan(3)
  66. const broker = aedes()
  67. t.teardown(broker.close.bind(broker))
  68. const s = setup(broker)
  69. s.inStream.write({
  70. cmd: 'connect',
  71. protocolId: 'MQTT',
  72. protocolVersion: 2,
  73. clean: true,
  74. clientId: 'my-client',
  75. keepalive: 0
  76. })
  77. s.outStream.on('data', function (packet) {
  78. t.fail('no data sent')
  79. })
  80. broker.on('connectionError', function (client, err) {
  81. t.equal(client.version, null)
  82. t.equal(err.message, 'Invalid protocol version')
  83. t.equal(broker.connectedClients, 0)
  84. })
  85. })
  86. test('reject clients that exceed the keepalive limit', function (t) {
  87. t.plan(3)
  88. const broker = aedes({
  89. keepaliveLimit: 100
  90. })
  91. t.teardown(broker.close.bind(broker))
  92. const s = setup(broker)
  93. s.inStream.write({
  94. cmd: 'connect',
  95. keepalive: 150
  96. })
  97. s.outStream.on('data', function (packet) {
  98. console.log(packet)
  99. t.same(packet, {
  100. cmd: 'connack',
  101. returnCode: 6,
  102. length: 2,
  103. qos: 0,
  104. retain: false,
  105. dup: false,
  106. topic: null,
  107. payload: null,
  108. sessionPresent: false
  109. }, 'unsuccessful connack, keep alive limit exceeded')
  110. })
  111. broker.on('connectionError', function (client, err) {
  112. t.equal(err.message, 'keep alive limit exceeded')
  113. t.equal(broker.connectedClients, 0)
  114. })
  115. })
  116. // Guarded in mqtt-packet
  117. test('reject clients with no clientId running on MQTT 3.1.0', function (t) {
  118. t.plan(3)
  119. const broker = aedes()
  120. t.teardown(broker.close.bind(broker))
  121. const s = setup(broker)
  122. s.inStream.write({
  123. cmd: 'connect',
  124. protocolId: 'MQIsdp',
  125. protocolVersion: 3,
  126. clean: true,
  127. keepalive: 0
  128. })
  129. s.outStream.on('data', function (packet) {
  130. t.fail('no data sent')
  131. })
  132. broker.on('connectionError', function (client, err) {
  133. t.equal(client.version, null)
  134. t.equal(err.message, 'clientId must be supplied before 3.1.1')
  135. t.equal(broker.connectedClients, 0)
  136. })
  137. })
  138. // [MQTT-3.1.3-7], Guarded in mqtt-packet
  139. test('reject clients without clientid and clean=false on MQTT 3.1.1', function (t) {
  140. t.plan(2)
  141. const broker = aedes()
  142. t.teardown(broker.close.bind(broker))
  143. const s = setup(broker)
  144. s.inStream.write({
  145. cmd: 'connect',
  146. protocolId: 'MQTT',
  147. protocolVersion: 4,
  148. clean: false,
  149. clientId: '',
  150. keepalive: 0
  151. })
  152. s.outStream.on('data', function (packet) {
  153. t.fail('no data sent')
  154. })
  155. broker.on('connectionError', function (client, err) {
  156. t.equal(err.message, 'clientId must be given if cleanSession set to 0')
  157. t.equal(broker.connectedClients, 0)
  158. })
  159. })
  160. test('clients without clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) {
  161. t.plan(5)
  162. const broker = aedes()
  163. t.teardown(broker.close.bind(broker))
  164. const s = setup(broker)
  165. s.inStream.write({
  166. cmd: 'connect',
  167. protocolId: 'MQTT',
  168. protocolVersion: 4,
  169. clean: true,
  170. keepalive: 0
  171. })
  172. s.outStream.on('data', function (packet) {
  173. t.equal(packet.cmd, 'connack')
  174. t.equal(packet.returnCode, 0)
  175. t.equal(broker.connectedClients, 1)
  176. t.equal(s.client.version, 4)
  177. })
  178. broker.on('connectionError', function (client, err) {
  179. t.error(err, 'no error')
  180. })
  181. broker.on('client', function (client) {
  182. t.ok(client.id.startsWith('aedes_'))
  183. })
  184. })
  185. test('client connect error while fetching subscriptions', function (t) {
  186. t.plan(2)
  187. const broker = aedes()
  188. t.teardown(broker.close.bind(broker))
  189. const s = setup(broker)
  190. broker.persistence.subscriptionsByClient = function (c, cb) {
  191. cb(new Error('error'), [], c)
  192. }
  193. s.inStream.write({
  194. cmd: 'connect',
  195. protocolId: 'MQTT',
  196. protocolVersion: 4,
  197. clean: false,
  198. clientId: 'my-client',
  199. keepalive: 0
  200. })
  201. broker.on('clientError', function (client, err) {
  202. t.equal(client.version, 4)
  203. t.pass('throws error')
  204. })
  205. })
  206. test('client connect clear outgoing', function (t) {
  207. t.plan(1)
  208. const clientId = 'abcde'
  209. const brokerId = 'pippo'
  210. const broker = aedes({ id: brokerId })
  211. t.teardown(broker.close.bind(broker))
  212. const subs = [{ clientId }]
  213. const packet = {
  214. cmd: 'publish',
  215. topic: 'hello',
  216. payload: Buffer.from('world'),
  217. qos: 1,
  218. brokerId,
  219. brokerCounter: 2,
  220. retain: true,
  221. messageId: 42,
  222. dup: false
  223. }
  224. broker.persistence.outgoingEnqueueCombi(subs, packet, function () {
  225. const s = setup(broker)
  226. s.inStream.write({
  227. cmd: 'connect',
  228. protocolId: 'MQTT',
  229. protocolVersion: 4,
  230. clean: true,
  231. clientId,
  232. keepalive: 0
  233. })
  234. broker.on('clientReady', function (client) {
  235. broker.persistence.outgoingUpdate(client, packet, function (err) {
  236. t.equal('no such packet', err.message, 'packet not found')
  237. })
  238. })
  239. })
  240. })
  241. test('clients with zero-byte clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) {
  242. t.plan(5)
  243. const broker = aedes()
  244. t.teardown(broker.close.bind(broker))
  245. const s = setup(broker)
  246. s.inStream.write({
  247. cmd: 'connect',
  248. protocolId: 'MQTT',
  249. protocolVersion: 4,
  250. clean: true,
  251. clientId: '',
  252. keepalive: 0
  253. })
  254. s.outStream.on('data', function (packet) {
  255. t.equal(packet.cmd, 'connack')
  256. t.equal(packet.returnCode, 0)
  257. t.equal(broker.connectedClients, 1)
  258. t.equal(s.client.version, 4)
  259. })
  260. broker.on('connectionError', function (client, err) {
  261. t.error(err, 'no error')
  262. })
  263. broker.on('client', function (client) {
  264. t.ok(client.id.startsWith('aedes_'))
  265. })
  266. })
  267. // [MQTT-3.1.3-7]
  268. test('reject clients with > 23 clientId length in MQTT 3.1.0', function (t) {
  269. t.plan(7)
  270. const broker = aedes()
  271. t.teardown(broker.close.bind(broker))
  272. const s = setup(broker)
  273. const conn = s.client.conn
  274. const end = conn.end
  275. conn.end = function () {
  276. t.fail('should not call `conn.end()`')
  277. end()
  278. }
  279. function drain () {
  280. t.pass('should empty connection request queue')
  281. }
  282. conn._writableState.getBuffer = () => [{ callback: drain }, { callback: drain }]
  283. s.inStream.write({
  284. cmd: 'connect',
  285. protocolId: 'MQIsdp',
  286. protocolVersion: 3,
  287. clean: true,
  288. clientId: 'abcdefghijklmnopqrstuvwxyz',
  289. keepalive: 0
  290. })
  291. s.outStream.on('data', function (packet) {
  292. t.equal(packet.cmd, 'connack')
  293. t.equal(packet.returnCode, 2, 'identifier rejected')
  294. t.equal(broker.connectedClients, 0)
  295. t.equal(s.client.version, null)
  296. })
  297. broker.on('connectionError', function (client, err) {
  298. t.equal(err.message, 'identifier rejected')
  299. })
  300. })
  301. test('connect clients with > 23 clientId length using aedes maxClientsIdLength option in MQTT 3.1.0', function (t) {
  302. t.plan(4)
  303. const broker = aedes({ maxClientsIdLength: 26 })
  304. t.teardown(broker.close.bind(broker))
  305. const s = setup(broker)
  306. s.inStream.write({
  307. cmd: 'connect',
  308. protocolId: 'MQTT',
  309. protocolVersion: 3,
  310. clean: true,
  311. clientId: 'abcdefghijklmnopqrstuvwxyz',
  312. keepalive: 0
  313. })
  314. s.outStream.on('data', function (packet) {
  315. t.equal(packet.cmd, 'connack')
  316. t.equal(packet.returnCode, 0)
  317. t.equal(broker.connectedClients, 1)
  318. t.equal(s.client.version, 3)
  319. })
  320. broker.on('connectionError', function (client, err) {
  321. t.error(err, 'no error')
  322. })
  323. })
  324. test('connect with > 23 clientId length in MQTT 3.1.1', function (t) {
  325. t.plan(4)
  326. const broker = aedes()
  327. t.teardown(broker.close.bind(broker))
  328. const s = setup(broker)
  329. s.inStream.write({
  330. cmd: 'connect',
  331. protocolId: 'MQTT',
  332. protocolVersion: 4,
  333. clean: true,
  334. clientId: 'abcdefghijklmnopqrstuvwxyz',
  335. keepalive: 0
  336. })
  337. s.outStream.on('data', function (packet) {
  338. t.equal(packet.cmd, 'connack')
  339. t.equal(packet.returnCode, 0)
  340. t.equal(broker.connectedClients, 1)
  341. t.equal(s.client.version, 4)
  342. })
  343. broker.on('connectionError', function (client, err) {
  344. t.error(err, 'no error')
  345. })
  346. })
  347. // [MQTT-3.1.0-1]
  348. test('the first Packet MUST be a CONNECT Packet', function (t) {
  349. t.plan(2)
  350. const broker = aedes()
  351. t.teardown(broker.close.bind(broker))
  352. const packet = {
  353. cmd: 'publish',
  354. topic: 'hello',
  355. payload: Buffer.from('world'),
  356. qos: 0,
  357. retain: false
  358. }
  359. const s = setup(broker)
  360. s.inStream.write(packet)
  361. broker.on('connectionError', function (client, err) {
  362. t.equal(err.message, 'Invalid protocol')
  363. })
  364. setImmediate(() => {
  365. t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT')
  366. s.conn.destroy()
  367. })
  368. })
  369. // [MQTT-3.1.0-2]
  370. test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client', function (t) {
  371. t.plan(4)
  372. const broker = aedes()
  373. t.teardown(broker.close.bind(broker))
  374. const packet = {
  375. cmd: 'connect',
  376. protocolId: 'MQTT',
  377. protocolVersion: 4,
  378. clean: true,
  379. clientId: 'my-client',
  380. keepalive: 0
  381. }
  382. broker.on('clientError', function (client, err) {
  383. t.equal(err.message, 'Invalid protocol')
  384. })
  385. const s = connect(setup(broker), { clientId: 'abcde' })
  386. s.broker.on('clientReady', function () {
  387. t.ok(broker.clients.abcde.connected)
  388. // destory client when there is a 2nd cmd:connect, even the clientId is dfferent
  389. s.inStream.write(packet)
  390. setImmediate(() => {
  391. t.equal(broker.clients.abcde, undefined, 'client instance is removed')
  392. t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established')
  393. })
  394. })
  395. })
  396. test('connect handler calls done when preConnect throws error', function (t) {
  397. t.plan(1)
  398. const broker = aedes({
  399. preConnect: function (client, packet, done) {
  400. done(Error('error in preconnect'))
  401. }
  402. })
  403. t.teardown(broker.close.bind(broker))
  404. const s = setup(broker)
  405. const handleConnect = require('../lib/handlers/connect')
  406. handleConnect(s.client, {}, function done (err) {
  407. t.equal(err.message, 'error in preconnect', 'calls done with error')
  408. })
  409. })
  410. test('handler calls done when disconnect or unknown packet cmd is received', function (t) {
  411. t.plan(2)
  412. const broker = aedes()
  413. t.teardown(broker.close.bind(broker))
  414. const s = setup(broker)
  415. const handle = require('../lib/handlers/index')
  416. handle(s.client, { cmd: 'disconnect' }, function done () {
  417. t.pass('calls done when disconnect cmd is received')
  418. })
  419. handle(s.client, { cmd: 'fsfadgragae' }, function done () {
  420. t.pass('calls done when unknown cmd is received')
  421. })
  422. })
  423. test('reject second CONNECT Packet sent while first CONNECT still in preConnect stage', function (t) {
  424. t.plan(2)
  425. const packet1 = {
  426. cmd: 'connect',
  427. protocolId: 'MQTT',
  428. protocolVersion: 4,
  429. clean: true,
  430. clientId: 'my-client-1',
  431. keepalive: 0
  432. }
  433. const packet2 = {
  434. cmd: 'connect',
  435. protocolId: 'MQTT',
  436. protocolVersion: 4,
  437. clean: true,
  438. clientId: 'my-client-2',
  439. keepalive: 0
  440. }
  441. let i = 0
  442. const broker = aedes({
  443. preConnect: function (client, packet, done) {
  444. const ms = i++ === 0 ? 2000 : 500
  445. setTimeout(function () {
  446. done(null, true)
  447. }, ms)
  448. }
  449. })
  450. t.teardown(broker.close.bind(broker))
  451. const s = setup(broker)
  452. broker.on('connectionError', function (client, err) {
  453. t.equal(err.info.clientId, 'my-client-2')
  454. t.equal(err.message, 'Invalid protocol')
  455. })
  456. const msg = async (s, ms, msg) => {
  457. await delay(ms)
  458. s.inStream.write(msg)
  459. }
  460. ;(async () => {
  461. await Promise.all([msg(s, 100, packet1), msg(s, 200, packet2)])
  462. })().catch(
  463. (error) => {
  464. t.fail(error)
  465. }
  466. )
  467. })
  468. // [MQTT-3.1.2-1], Guarded in mqtt-packet
  469. test('reject clients with wrong protocol name', function (t) {
  470. t.plan(2)
  471. const broker = aedes()
  472. t.teardown(broker.close.bind(broker))
  473. const s = setup(broker)
  474. s.inStream.write({
  475. cmd: 'connect',
  476. protocolId: 'MQTT_hello',
  477. protocolVersion: 3,
  478. clean: true,
  479. clientId: 'my-client',
  480. keepalive: 0
  481. })
  482. s.outStream.on('data', function (packet) {
  483. t.fail('no data sent')
  484. })
  485. broker.on('connectionError', function (client, err) {
  486. t.equal(err.message, 'Invalid protocolId')
  487. t.equal(broker.connectedClients, 0)
  488. })
  489. })
  490. test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) {
  491. t.plan(2)
  492. const queueLimit = 50
  493. const broker = aedes({ queueLimit })
  494. t.teardown(broker.close.bind(broker))
  495. const publishP = {
  496. cmd: 'publish',
  497. topic: 'hello',
  498. payload: Buffer.from('world'),
  499. qos: 0,
  500. retain: false
  501. }
  502. const connectP = {
  503. cmd: 'connect',
  504. protocolId: 'MQTT',
  505. protocolVersion: 4,
  506. clean: true,
  507. clientId: 'abcde',
  508. keepalive: 0
  509. }
  510. const s = setup(broker)
  511. s.inStream.write(connectP)
  512. process.once('warning', e => t.fail('Memory leak detected'))
  513. for (let i = 0; i < queueLimit; i++) {
  514. s.inStream.write(publishP)
  515. }
  516. broker.on('client', function (client) {
  517. t.equal(client._parser._queue.length, queueLimit, 'Packets have been queued')
  518. client.once('connected', () => {
  519. t.equal(client._parser._queue, null, 'Queue is empty')
  520. s.conn.destroy()
  521. })
  522. })
  523. })
  524. test('Test queue limit', function (t) {
  525. t.plan(1)
  526. const queueLimit = 50
  527. const broker = aedes({ queueLimit })
  528. t.teardown(broker.close.bind(broker))
  529. const publishP = {
  530. cmd: 'publish',
  531. topic: 'hello',
  532. payload: Buffer.from('world'),
  533. qos: 0,
  534. retain: false
  535. }
  536. const connectP = {
  537. cmd: 'connect',
  538. protocolId: 'MQTT',
  539. protocolVersion: 4,
  540. clean: true,
  541. clientId: 'abcde',
  542. keepalive: 0
  543. }
  544. const s = setup(broker)
  545. s.inStream.write(connectP)
  546. process.once('warning', e => t.fail('Memory leak detected'))
  547. for (let i = 0; i < queueLimit + 1; i++) {
  548. s.inStream.write(publishP)
  549. }
  550. broker.on('connectionError', function (conn, err) {
  551. t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown')
  552. s.conn.destroy()
  553. })
  554. })
  555. ;[['fail with no error msg', 3, null, false], ['succeed with no error msg', 9, null, true], ['fail with error msg', 6, new Error('connection banned'), false], ['succeed with error msg', 6, new Error('connection banned'), true]].forEach(function (ele, idx) {
  556. const title = ele[0]
  557. const plan = ele[1]
  558. const err = ele[2]
  559. const ok = ele[3]
  560. test('preConnect handler - ' + title, function (t) {
  561. t.plan(plan)
  562. const broker = aedes({
  563. preConnect: function (client, packet, done) {
  564. t.ok(client.connecting)
  565. t.notOk(client.connected)
  566. t.equal(client.version, null)
  567. return done(err, ok)
  568. }
  569. })
  570. t.teardown(broker.close.bind(broker))
  571. const s = setup(broker)
  572. s.inStream.write({
  573. cmd: 'connect',
  574. protocolId: 'MQTT',
  575. protocolVersion: 4,
  576. clean: true,
  577. clientId: 'my-client-' + idx,
  578. keepalive: 0
  579. })
  580. broker.on('client', function (client) {
  581. if (ok && !err) {
  582. t.ok(client.connecting)
  583. t.notOk(client.connected)
  584. t.pass('register client ok')
  585. } else {
  586. t.fail('no reach here')
  587. }
  588. })
  589. broker.on('clientReady', function (client) {
  590. t.notOk(client.connecting)
  591. t.ok(client.connected)
  592. t.pass('connect ok')
  593. })
  594. broker.on('clientError', function (client, err) {
  595. t.fail('no client error')
  596. })
  597. broker.on('connectionError', function (client, err) {
  598. if (err) {
  599. t.notOk(client.connecting)
  600. t.notOk(client.connected)
  601. t.equal(err.message, 'connection banned')
  602. } else {
  603. t.fail('no connection error')
  604. }
  605. })
  606. })
  607. })
  608. // websocket-stream based connections
  609. test('websocket clients have access to the request object', function (t) {
  610. t.plan(3)
  611. const port = 4883
  612. const broker = aedes()
  613. broker.on('client', function (client) {
  614. if (client.req) {
  615. t.pass('client request object present')
  616. if (client.req.headers) {
  617. t.equal('sample', client.req.headers['x-test-protocol'])
  618. }
  619. } else {
  620. t.fail('no request object present')
  621. }
  622. })
  623. const server = http.createServer()
  624. ws.createServer({
  625. server
  626. }, broker.handle)
  627. server.listen(port, function (err) {
  628. t.error(err, 'no error')
  629. })
  630. const client = mqtt.connect(`ws://localhost:${port}`, {
  631. wsOptions: {
  632. headers: {
  633. 'X-Test-Protocol': 'sample'
  634. }
  635. }
  636. })
  637. t.teardown(() => {
  638. client.end(true)
  639. broker.close()
  640. server.close()
  641. })
  642. })