Browse Source

初始化项目,更新文档

caner 1 year ago
parent
commit
c69c594950
7 changed files with 228 additions and 2 deletions
  1. 43 2
      README.md
  2. 38 0
      app.js
  3. 21 0
      config.js
  4. 27 0
      package.json
  5. 30 0
      src/database.js
  6. 33 0
      src/mqtt.js
  7. 36 0
      src/routes.js

+ 43 - 2
README.md

@@ -1,3 +1,44 @@
-# service
+# IOT 服务端
+[使用aedes作为MQTT服务](https://github.com/moscajs/aedes#readme)
+[使用fastiry作为http服务](https://github.com/fastify/fastify)
+[使用sqlite3作为数据存储](https://github.com/TryGhost/node-sqlite3)
+服务框架使用http+mqtt+sqlite3,以轻量化为目的。整个原理采用IM群聊方式进行设备管理
+ 
+1. 设备:
+    1. 存储:
+        1. wifi 房间号,设备唯一ID
+    2. 接收:
+        1. 接收来自管理端的配置信息:wifi 房间号,唯一设备ID等
+        2. 接收来自房间的控制信息:执行命令
+    3. 发送:
+        2. 订阅房间号后发送设备状态等相关信息
+    4. 订阅:
+        1. 主动订阅房间号
+        2. 被动订阅管理端(由服务端完成)
+2. 管理端(网页)
+    1. 对房间进行增删改查操作
+    2. 对设备进行相关配置,分配房间号,配置Wi-Fi等
+    3. 对房间中的设备进行增删改查,控制等操作,房间外的设备不允许控制操作
+    4. 设备统计(包括未分配房间的设备,未分配设备只显示状态),在房间中显示各设备状态
+    5. 控制设备时,需要下发设备唯一ID
+3. 服务端
+    1. 主动对连接设备进行管理号的订阅,方便后面统计设备状态,修改wifi 信息等
+    2. 增加http-jwt鉴权
+    3. splite3
+        1. 存储房间,设备,日志等相关数据
+        2. 为管理端提供数据支撑
+    4. mqtt
+        1. 对已连接设备进行主动订阅,并下发唯一设备id: 设备连接时需要提供名称,唯一ID=名称+uuid
+    
+## Run
+```
+1. yarn build
+2. ./server
+```
 
-服务端
+## Debug
+```
+1. yarn
+2. node index.js
+3. node>=18.0.0
+```

+ 38 - 0
app.js

@@ -0,0 +1,38 @@
+const fastify = require('fastify')
+const config = require('./config.js')
+const { initDB } = require('./src/database.js')
+const routes = require('./src/routes.js')
+const { createServer } = require('net')
+const { broker } = require('./src/mqtt.js')
+
+// 初始化Fastify(移除WebSocket相关代码)
+const app = fastify({
+  logger: true,
+  maxParamLength: 256,    // 限制路径参数长度
+  connectionTimeout: 5000,// TCP连接超时
+  keepAliveTimeout: 5000, // 保持连接时间
+  bodyLimit: 1048576      // 请求体限制1MB
+})
+const mqttServer = createServer(broker.handle)
+
+// 初始化数据库
+await initDB()
+
+// 仅注册API路由
+app.register(routes, { prefix: '/api' })
+
+// 启动HTTP服务
+app.listen(config.http, (err) => {
+  if (err) {
+    app.log.error(err)
+    process.exit(1)
+  }
+})
+
+// 启动纯TCP版MQTT服务
+mqttServer.listen(config.http.port, config.http.host, () => {
+  app.log.info(`MQTT TCP服务已启动: tcp://localhost:${config.mqtt.tcp_port}`)
+}).on('error', () => {
+  app.log.error(err)
+  process.exit(1)
+})

+ 21 - 0
config.js

@@ -0,0 +1,21 @@
+export default {
+  http: {
+    port: process.env.HTTP_PORT || 3000,
+    host: '0.0.0.0',
+  },
+  jwt: {
+    secret: 'IOTSERVICES',
+    expiresIn: '1h'
+  },
+  db: {
+    file: './data.db',
+    messages_table: `
+        CREATE TABLE IF NOT EXISTS messages (
+          id INTEGER PRIMARY KEY AUTOINCREMENT,
+          topic TEXT NOT NULL,
+          payload TEXT,
+          client_id TEXT,
+          created_at DATETIME DEFAULT CURRENT_TIMESTAMP
+        )`
+  }
+}

+ 27 - 0
package.json

@@ -0,0 +1,27 @@
+{
+  "name": "server",
+  "version": "1.0.0",
+  "description": "服务端",
+  "author": "Caner",
+  "main": "app.js",
+  "bin": "app.js",
+  "scripts": {
+    "build": "pkg . --compress=GZip"
+  },
+  "license": "ISC",
+  "dependencies": {
+    "aedes": "^0.51.3",
+    "log4js": "^6.9.1",
+    "fastify": "^5.2.2",
+    "sqlite3": "^5.1.7",
+    "jwt": "^0.2.0",
+    "util": "^0.12.5"
+  },
+  "pkg": {
+    "targets": [
+      "node18-linux-arm64"
+    ],
+    "output": "Server",
+    "outputPath": "dist"
+  }
+}

+ 30 - 0
src/database.js

@@ -0,0 +1,30 @@
+const sqlite3 = require('sqlite3')
+const { promisify } = require('util')
+const config = require('../config.js')
+const db = new sqlite3.Database(config.db.file)
+
+// 转换为Promise接口
+db.runAsync = promisify(db.run)
+db.allAsync = promisify(db.all)
+
+// 初始化数据库
+export async function initDB() {
+  await db.runAsync(config.db.messages_table)
+  await db.runAsync('PRAGMA journal_mode = WAL;')
+}
+
+// 消息存储
+export async function saveMessage(packet, clientId) {
+  return db.runAsync(
+    'INSERT INTO messages (topic, payload, client_id) VALUES (?, ?, ?)',
+    [packet.topic, packet.payload.toString(), clientId]
+  )
+}
+
+// 消息查询
+export async function getMessages(limit = 100) {
+  return db.allAsync(
+    'SELECT * FROM messages ORDER BY created_at DESC LIMIT ?',
+    [limit]
+  )
+}

+ 33 - 0
src/mqtt.js

@@ -0,0 +1,33 @@
+const aedes = require('aedes')
+const { saveMessage } = require('./database.js')
+
+const broker = aedes({
+  persistence: aedes.persistence.Memory,
+  authenticate: (client, username, password, done) => {
+    // TODO 主动订阅管理端|设备id=用户名+uuid = 订阅后将返回设备唯一id
+    // const er = client.subscribe({ topic: room, qos: 0 }, e => { return e })
+    // if (!er) {
+    //     rooms[room].push({ room, id: client?.id })
+    //     aedes.publish({ topic: room, qos: 0, payload: Buffer.from(JSON.stringify({ type: 'join', channel: room, from_id: client?.id })) }, e => { return e })
+    //     done(null, true)
+    // } else {
+    //   done(null, false)
+    // }
+    done(null, true)
+  },
+  concurrency: 100,        // 并行处理数
+  heartbeatInterval: 60000 // 心跳间隔
+})
+
+// 消息存储拦截
+broker.on('publish', async (packet, client) => {
+  if (client) {
+    try {
+      await saveMessage(packet, client.id)
+    } catch (err) {
+      broker.emit('error', err)
+    }
+  }
+})
+
+export { broker }

+ 36 - 0
src/routes.js

@@ -0,0 +1,36 @@
+const jwt = require('jsonwebtoken');
+const { getMessages } = require('./databse.js')
+const authenticate = (req, res, next) => {
+    const token = req.headers.authorization?.split(' ')[1];
+    if (!token) return res.sendStatus(401);
+    jwt.verify(token, config.jwt.secret, (err, user) => {
+        if (err) return res.sendStatus(403);
+        req.user = user;
+        next();
+    });
+}
+
+export default async function (fastify) {
+    // 消息查询接口
+    fastify.get('/messages', authenticate, async () => {
+        return getMessages()
+    })
+
+    // 消息发布接口
+    fastify.post('/publish', authenticate, {
+        schema: {
+            body: {
+                type: 'object',
+                required: ['topic', 'message'],
+                properties: {
+                    topic: { type: 'string' },
+                    message: { type: 'string' }
+                }
+            }
+        }
+    }, async (request) => {
+        const { topic, message } = request.body
+        fastify.mqtt.publish(topic, message)
+        return { status: '消息已发布' }
+    })
+}