|
|
@@ -0,0 +1,805 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+OLED系统监控器 - 树莓派4B专用版
|
|
|
+功能:触摸唤醒(GPIO18->1M电阻->金属片->GPIO24)+ 30秒自动熄屏
|
|
|
+"""
|
|
|
+import time
|
|
|
+import socket
|
|
|
+import psutil
|
|
|
+from datetime import datetime
|
|
|
+import RPi.GPIO as GPIO
|
|
|
+import sys
|
|
|
+
|
|
|
+# 全局变量
|
|
|
+DRAW_OBJECT = None
|
|
|
+
|
|
|
+class I2CDevice:
|
|
|
+ def __init__(self, address, busnum=1):
|
|
|
+ import smbus2 # 树莓派4B推荐使用smbus2
|
|
|
+ self._address = address
|
|
|
+ self._bus = smbus2.SMBus(busnum)
|
|
|
+
|
|
|
+ def write8(self, register, value):
|
|
|
+ self._bus.write_byte_data(self._address, register, value)
|
|
|
+
|
|
|
+ def writeList(self, register, data):
|
|
|
+ self._bus.write_i2c_block_data(self._address, register, data)
|
|
|
+
|
|
|
+class SSD1306Final:
|
|
|
+ """SSD1306驱动(树莓派4B优化版)"""
|
|
|
+
|
|
|
+ # SSD1306寄存器地址
|
|
|
+ SETCONTRAST = 0x81
|
|
|
+ DISPLAYALLON_RESUME = 0xA4
|
|
|
+ NORMALDISPLAY = 0xA6
|
|
|
+ DISPLAYOFF = 0xAE
|
|
|
+ DISPLAYON = 0xAF
|
|
|
+ SETDISPLAYOFFSET = 0xD3
|
|
|
+ SETCOMPINS = 0xDA
|
|
|
+ SETVCOMDETECT = 0xDB
|
|
|
+ SETDISPLAYCLOCKDIV = 0xD5
|
|
|
+ SETPRECHARGE = 0xD9
|
|
|
+ SETMULTIPLEX = 0xA8
|
|
|
+ SETSTARTLINE = 0x40
|
|
|
+ MEMORYMODE = 0x20
|
|
|
+ COLUMNADDR = 0x21
|
|
|
+ PAGEADDR = 0x22
|
|
|
+ COMSCANDEC = 0xC8
|
|
|
+ SEGREMAP = 0xA0
|
|
|
+ CHARGEPUMP = 0x8D
|
|
|
+
|
|
|
+ def __init__(self, width=128, height=64, address=0x3C):
|
|
|
+ global DRAW_OBJECT
|
|
|
+
|
|
|
+ print("=" * 60)
|
|
|
+ print("SSD1306Final 初始化开始...")
|
|
|
+ print(f"设备: 树莓派4B专用优化")
|
|
|
+ print(f"分辨率: {width}x{height}")
|
|
|
+ print(f"I2C地址: 0x{address:02X}")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ self.width = width
|
|
|
+ self.height = height
|
|
|
+ self.address = address
|
|
|
+ self._device = None
|
|
|
+ self.buffer = [0] * (self.width * self.height // 8)
|
|
|
+ self.image = None
|
|
|
+ self.fonts = None
|
|
|
+
|
|
|
+ # 1. 检查PIL库
|
|
|
+ self._check_pil()
|
|
|
+
|
|
|
+ # 2. 初始化draw对象
|
|
|
+ self._force_init_draw()
|
|
|
+
|
|
|
+ # 3. 初始化I2C设备(树莓派4B优化)
|
|
|
+ self._init_i2c(address)
|
|
|
+
|
|
|
+ # 4. 初始化SSD1306硬件
|
|
|
+ if self._device:
|
|
|
+ self._initialize_hardware()
|
|
|
+
|
|
|
+ # 5. 加载字体
|
|
|
+ self._load_fonts()
|
|
|
+
|
|
|
+ print("SSD1306Final 初始化完成!")
|
|
|
+ print(f"draw对象状态: {'✅ 可用' if self.draw else '❌ 不可用'}")
|
|
|
+ print(f"I2C设备状态: {'✅ 连接' if self._device else '❌ 未连接'}")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ def _check_pil(self):
|
|
|
+ """检查PIL库"""
|
|
|
+ global DRAW_OBJECT
|
|
|
+
|
|
|
+ try:
|
|
|
+ from PIL import Image, ImageDraw, ImageFont
|
|
|
+ self.pil_available = True
|
|
|
+ self.Image = Image
|
|
|
+ self.ImageDraw = ImageDraw
|
|
|
+ self.ImageFont = ImageFont
|
|
|
+ print(" PIL库检查通过")
|
|
|
+ except ImportError as e:
|
|
|
+ self.pil_available = False
|
|
|
+ print(f" PIL库导入失败: {e}")
|
|
|
+ print(" 正在尝试安装Pillow库...")
|
|
|
+ import subprocess
|
|
|
+ try:
|
|
|
+ subprocess.check_call([sys.executable, "-m", "pip", "install", "pillow"])
|
|
|
+ from PIL import Image, ImageDraw, ImageFont
|
|
|
+ self.pil_available = True
|
|
|
+ self.Image = Image
|
|
|
+ self.ImageDraw = ImageDraw
|
|
|
+ self.ImageFont = ImageFont
|
|
|
+ print(" Pillow库安装成功")
|
|
|
+ except Exception as e2:
|
|
|
+ print(f" Pillow库安装失败: {e2}")
|
|
|
+
|
|
|
+ def _force_init_draw(self):
|
|
|
+ """强制初始化draw对象"""
|
|
|
+ global DRAW_OBJECT
|
|
|
+
|
|
|
+ print(" 初始化draw对象...")
|
|
|
+
|
|
|
+ if self.pil_available and hasattr(self, 'Image') and hasattr(self, 'ImageDraw'):
|
|
|
+ try:
|
|
|
+ self.image = self.Image.new('1', (self.width, self.height))
|
|
|
+ self.draw = self.ImageDraw.Draw(self.image)
|
|
|
+ DRAW_OBJECT = self.draw
|
|
|
+ print(" draw对象创建成功")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" PIL创建draw失败: {e}")
|
|
|
+ self._create_fallback_draw()
|
|
|
+ else:
|
|
|
+ print(" 创建备用draw对象")
|
|
|
+ self._create_fallback_draw()
|
|
|
+
|
|
|
+ if not hasattr(self, 'draw') or self.draw is None:
|
|
|
+ self._create_fallback_draw()
|
|
|
+
|
|
|
+ DRAW_OBJECT = self.draw
|
|
|
+
|
|
|
+ def _create_fallback_draw(self):
|
|
|
+ """创建备用draw对象"""
|
|
|
+ class FallbackDraw:
|
|
|
+ def text(self, *args, **kwargs):
|
|
|
+ print(f" 文本绘制: {args}")
|
|
|
+ def rectangle(self, *args, **kwargs):
|
|
|
+ print(f" 矩形绘制: {args}")
|
|
|
+
|
|
|
+ self.draw = FallbackDraw()
|
|
|
+ print(" 使用备用draw对象")
|
|
|
+
|
|
|
+ def _init_i2c(self, address):
|
|
|
+ """初始化I2C设备(树莓派4B优化)"""
|
|
|
+ print(f"🔌 初始化I2C设备 @ 0x{address:02X}...")
|
|
|
+ try:
|
|
|
+ self._device = I2CDevice(address)
|
|
|
+ print(f" I2C设备连接成功")
|
|
|
+ except Exception as e:
|
|
|
+ self._device = None
|
|
|
+ print(f" I2C设备连接失败: {e}")
|
|
|
+ print(" 请检查I2C是否启用: sudo raspi-config -> 启用I2C")
|
|
|
+
|
|
|
+ def _initialize_hardware(self):
|
|
|
+ """初始化SSD1306硬件"""
|
|
|
+ if not self._device:
|
|
|
+ print(" 跳过硬件初始化")
|
|
|
+ return
|
|
|
+
|
|
|
+ print(" 初始化SSD1306硬件...")
|
|
|
+ try:
|
|
|
+ init_commands = [
|
|
|
+ self.DISPLAYOFF,
|
|
|
+ self.SETDISPLAYCLOCKDIV, 0x80,
|
|
|
+ self.SETMULTIPLEX, 0x3F,
|
|
|
+ self.SETDISPLAYOFFSET, 0x00,
|
|
|
+ self.SETSTARTLINE | 0x00,
|
|
|
+ self.CHARGEPUMP, 0x14,
|
|
|
+ self.MEMORYMODE, 0x00,
|
|
|
+ self.SEGREMAP | 0x01,
|
|
|
+ self.COMSCANDEC,
|
|
|
+ self.SETCOMPINS, 0x12,
|
|
|
+ self.SETCONTRAST, 0xFF,
|
|
|
+ self.SETPRECHARGE, 0xF1,
|
|
|
+ self.SETVCOMDETECT, 0x40,
|
|
|
+ self.DISPLAYALLON_RESUME,
|
|
|
+ self.NORMALDISPLAY,
|
|
|
+ self.DISPLAYON
|
|
|
+ ]
|
|
|
+
|
|
|
+ for cmd in init_commands:
|
|
|
+ self.command(cmd)
|
|
|
+ time.sleep(0.01) # 树莓派4B增加命令间隔
|
|
|
+
|
|
|
+ self.clear()
|
|
|
+ self.display()
|
|
|
+ print(" SSD1306硬件初始化成功")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" SSD1306硬件初始化失败: {e}")
|
|
|
+
|
|
|
+ def _load_fonts(self):
|
|
|
+ """加载字体"""
|
|
|
+ self.fonts = {}
|
|
|
+
|
|
|
+ if not self.pil_available or not hasattr(self, 'ImageFont'):
|
|
|
+ print(" 无法加载字体")
|
|
|
+ return
|
|
|
+
|
|
|
+ print(" 加载字体...")
|
|
|
+
|
|
|
+ # 中文字体路径(树莓派常用路径)
|
|
|
+ font_paths = [
|
|
|
+ '/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc',
|
|
|
+ '/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',
|
|
|
+ '/usr/share/fonts/truetype/freefont/FreeSans.ttf'
|
|
|
+ ]
|
|
|
+
|
|
|
+ font_path = None
|
|
|
+ for path in font_paths:
|
|
|
+ try:
|
|
|
+ self.ImageFont.truetype(path, 12)
|
|
|
+ font_path = path
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 字体 {path} 不可用: {e}")
|
|
|
+
|
|
|
+ if font_path:
|
|
|
+ print(f" 找到字体: {font_path}")
|
|
|
+ try:
|
|
|
+ self.fonts['small'] = self.ImageFont.truetype(font_path, 12)
|
|
|
+ self.fonts['medium'] = self.ImageFont.truetype(font_path, 14)
|
|
|
+ self.fonts['large'] = self.ImageFont.truetype(font_path, 18)
|
|
|
+ print(" 字体加载完成")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 字体加载失败: {e}")
|
|
|
+ self._load_default_fonts()
|
|
|
+ else:
|
|
|
+ print(" 使用默认字体")
|
|
|
+ self._load_default_fonts()
|
|
|
+
|
|
|
+ def _load_default_fonts(self):
|
|
|
+ """加载默认字体"""
|
|
|
+ if not self.pil_available or not hasattr(self, 'ImageFont'):
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.fonts['small'] = self.ImageFont.load_default(size=10)
|
|
|
+ self.fonts['medium'] = self.ImageFont.load_default(size=12)
|
|
|
+ self.fonts['large'] = self.ImageFont.load_default(size=14)
|
|
|
+ print(" 默认字体加载完成")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 默认字体加载失败: {e}")
|
|
|
+
|
|
|
+ def command(self, cmd):
|
|
|
+ """发送命令"""
|
|
|
+ if self._device:
|
|
|
+ try:
|
|
|
+ self._device.write8(0x00, cmd)
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 命令发送失败: {e}")
|
|
|
+
|
|
|
+ def clear(self):
|
|
|
+ """清空显示"""
|
|
|
+ self.buffer = [0] * (self.width * self.height // 8)
|
|
|
+ if self.draw:
|
|
|
+ try:
|
|
|
+ self.draw.rectangle((0, 0, self.width, self.height), fill=0)
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 清屏失败: {e}")
|
|
|
+
|
|
|
+ def display(self):
|
|
|
+ """更新显示"""
|
|
|
+ if not self._device:
|
|
|
+ print(" I2C设备未连接")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.command(self.COLUMNADDR)
|
|
|
+ self.command(0)
|
|
|
+ self.command(self.width - 1)
|
|
|
+
|
|
|
+ self.command(self.PAGEADDR)
|
|
|
+ self.command(0)
|
|
|
+ self.command(self.height // 8 - 1)
|
|
|
+
|
|
|
+ if self.image:
|
|
|
+ for y in range(self.height):
|
|
|
+ for x in range(self.width):
|
|
|
+ if self.image.getpixel((x, y)):
|
|
|
+ self.draw_pixel(x, y, True)
|
|
|
+
|
|
|
+ for i in range(0, len(self.buffer), 16):
|
|
|
+ chunk = self.buffer[i:i+16]
|
|
|
+ self._device.writeList(0x40, chunk)
|
|
|
+
|
|
|
+ print(" 显示更新成功")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 显示更新失败: {e}")
|
|
|
+
|
|
|
+ def draw_pixel(self, x, y, color=True):
|
|
|
+ """绘制像素"""
|
|
|
+ if x < 0 or x >= self.width or y < 0 or y >= self.height:
|
|
|
+ return
|
|
|
+
|
|
|
+ page = y // 8
|
|
|
+ bit_in_page = y % 8
|
|
|
+ buffer_index = page * self.width + x
|
|
|
+
|
|
|
+ if color:
|
|
|
+ self.buffer[buffer_index] |= (1 << bit_in_page)
|
|
|
+ else:
|
|
|
+ self.buffer[buffer_index] &= ~(1 << bit_in_page)
|
|
|
+
|
|
|
+ def draw_text(self, x, y, text, font_size='medium', color=1):
|
|
|
+ """绘制文本"""
|
|
|
+ if not self.draw:
|
|
|
+ print(f" 无法绘制文本: {text}")
|
|
|
+ return
|
|
|
+
|
|
|
+ if not self.fonts or font_size not in self.fonts:
|
|
|
+ print(f" 使用默认字体绘制: {text}")
|
|
|
+ if self.pil_available and hasattr(self, 'ImageFont'):
|
|
|
+ font = self.ImageFont.load_default(size=12)
|
|
|
+ else:
|
|
|
+ return
|
|
|
+
|
|
|
+ font = self.fonts.get(font_size, self.fonts.get('medium', None))
|
|
|
+ if not font:
|
|
|
+ print(f" 无法绘制文本: {text}")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ color = 1 if color != 0 else 0
|
|
|
+ self.draw.text((x, y), text, font=font, fill=color)
|
|
|
+ print(f" 文本绘制成功: '{text}'")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 文本绘制失败 '{text}': {e}")
|
|
|
+
|
|
|
+ def draw_text_centered(self, y, text, font_size='medium', color=1):
|
|
|
+ """居中绘制文本"""
|
|
|
+ if not self.draw or not self.fonts or font_size not in self.fonts:
|
|
|
+ self.draw_text(0, y, text, font_size, color)
|
|
|
+ return
|
|
|
+
|
|
|
+ font = self.fonts[font_size]
|
|
|
+ try:
|
|
|
+ text_width, text_height = font.getsize(text)
|
|
|
+ x = (self.width - text_width) // 2
|
|
|
+ color = 1 if color != 0 else 0
|
|
|
+ self.draw.text((x, y), text, font=font, fill=color)
|
|
|
+ print(f" 居中文本绘制成功: '{text}'")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 居中文本绘制失败 '{text}': {e}")
|
|
|
+ self.draw_text(0, y, text, font_size, color)
|
|
|
+
|
|
|
+ def draw_rectangle(self, x, y, width, height, fill=False, outline=True):
|
|
|
+ """绘制矩形"""
|
|
|
+ if not self.draw:
|
|
|
+ print(f" 无法绘制矩形")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.draw.rectangle((x, y, x + width - 1, y + height - 1),
|
|
|
+ fill=fill, outline=outline)
|
|
|
+ print(f" 矩形绘制成功")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 矩形绘制失败: {e}")
|
|
|
+
|
|
|
+def get_cpu_temperature():
|
|
|
+ """获取CPU温度"""
|
|
|
+ try:
|
|
|
+ with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f:
|
|
|
+ temp = float(f.read()) / 1000.0
|
|
|
+ return temp
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 获取温度失败: {e}")
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+def get_ip_address():
|
|
|
+ """获取IP地址"""
|
|
|
+ try:
|
|
|
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
+ s.connect(("8.8.8.8", 80))
|
|
|
+ ip = s.getsockname()[0]
|
|
|
+ s.close()
|
|
|
+ return ip
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 获取IP失败: {e}")
|
|
|
+ return "Unknown"
|
|
|
+
|
|
|
+def format_time_long(seconds):
|
|
|
+ """格式化时间"""
|
|
|
+ days = int(seconds // (3600 * 24))
|
|
|
+ hours = int((seconds % (3600 * 24)) // 3600)
|
|
|
+ minutes = int((seconds % 3600) // 60)
|
|
|
+ secs = int(seconds % 60)
|
|
|
+
|
|
|
+ if days > 0:
|
|
|
+ return f"{days}天{hours}时"
|
|
|
+ elif hours > 0:
|
|
|
+ return f"{hours}时{minutes}分"
|
|
|
+ else:
|
|
|
+ return f"{minutes}分{secs}秒"
|
|
|
+
|
|
|
+def format_bytes_short(bytes_value):
|
|
|
+ """格式化字节数"""
|
|
|
+ if bytes_value < 1024:
|
|
|
+ return f"{bytes_value}B"
|
|
|
+ elif bytes_value < 1024 * 1024:
|
|
|
+ return f"{bytes_value / 1024:.0f}K"
|
|
|
+ elif bytes_value < 1024 * 1024 * 1024:
|
|
|
+ return f"{bytes_value / (1024 * 1024):.0f}M"
|
|
|
+ else:
|
|
|
+ return f"{bytes_value / (1024 * 1024 * 1024):.1f}G"
|
|
|
+
|
|
|
+class ChineseSystemMonitorFinal:
|
|
|
+ """树莓派4B专用系统监控器"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.display = None
|
|
|
+ self.startup_time = time.time()
|
|
|
+
|
|
|
+ # 触摸功能配置(树莓派4B优化)
|
|
|
+ self.GPIO_TOUCH_OUT = 18 # BCM18 (物理引脚12)
|
|
|
+ self.GPIO_TOUCH_IN = 24 # BCM24 (物理引脚18)
|
|
|
+ self.last_touch_time = time.time()
|
|
|
+ self.screen_on = True
|
|
|
+ self.screen_timeout = 30 # 自动熄屏时间(秒)
|
|
|
+ self.touch_available = False
|
|
|
+
|
|
|
+ print("=" * 60)
|
|
|
+ print(" 树莓派4B专用OLED监控器 v2.5")
|
|
|
+ print(" 触摸功能: GPIO18->1M电阻->金属片->GPIO24")
|
|
|
+ print("⏱ 自动熄屏: 30秒无操作")
|
|
|
+ print("=" * 60)
|
|
|
+ print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
+ print(f"Python版本: {sys.version}")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 树莓派4B启动延时
|
|
|
+ print("⏳ 系统启动延迟...")
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ # 初始化GPIO触摸功能
|
|
|
+ self.init_gpio()
|
|
|
+
|
|
|
+ # 初始化显示
|
|
|
+ self._init_display()
|
|
|
+
|
|
|
+ # 显示启动信息
|
|
|
+ self._show_startup_screen()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 初始化错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
+ def init_gpio(self):
|
|
|
+ """初始化树莓派4B GPIO(优化版)"""
|
|
|
+ print(" 初始化触摸GPIO...")
|
|
|
+ try:
|
|
|
+ # 树莓派4B必须用BCM模式
|
|
|
+ GPIO.setmode(GPIO.BCM)
|
|
|
+ GPIO.setwarnings(False) # 关闭GPIO警告
|
|
|
+
|
|
|
+ # 配置充电GPIO(输出模式)
|
|
|
+ GPIO.setup(self.GPIO_TOUCH_OUT, GPIO.OUT)
|
|
|
+ GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW) # 初始低电平
|
|
|
+
|
|
|
+ # 配置感应GPIO(输入模式)
|
|
|
+ GPIO.setup(self.GPIO_TOUCH_IN, GPIO.IN, pull_up_down=GPIO.PUD_OFF)
|
|
|
+
|
|
|
+ # 验证初始电平(关键调试信息)
|
|
|
+ print(f" GPIO{self.GPIO_TOUCH_OUT}输出电平: {GPIO.input(self.GPIO_TOUCH_OUT)} (应为0)")
|
|
|
+ print(f" GPIO{self.GPIO_TOUCH_IN}初始电平: {GPIO.input(self.GPIO_TOUCH_IN)} (未触摸时可能为0或1)")
|
|
|
+
|
|
|
+ self.touch_available = True
|
|
|
+ print(" GPIO初始化成功")
|
|
|
+ print(f" 物理引脚对应: GPIO18=12号, GPIO24=18号")
|
|
|
+ except Exception as e:
|
|
|
+ self.touch_available = False
|
|
|
+ print(f" GPIO初始化失败: {e}")
|
|
|
+ print(" 请检查是否安装RPi.GPIO: pip3 install RPi.GPIO")
|
|
|
+
|
|
|
+ def check_touch(self):
|
|
|
+ """树莓派4B专用触摸检测(基于RC电路充放电时间检测)"""
|
|
|
+ if not self.touch_available:
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 放电阶段:输出低电平,让电容放电
|
|
|
+ GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW)
|
|
|
+ time.sleep(0.001) # 1ms放电时间
|
|
|
+
|
|
|
+ # 2. 充电检测阶段:输出高电平,测量上升沿时间
|
|
|
+ GPIO.setup(self.GPIO_TOUCH_IN, GPIO.IN, pull_up_down=GPIO.PUD_OFF)
|
|
|
+ GPIO.output(self.GPIO_TOUCH_OUT, GPIO.HIGH)
|
|
|
+
|
|
|
+ # 计算放电时间
|
|
|
+ discharge_time = 0
|
|
|
+ start_time = time.time_ns()
|
|
|
+ threshold = 3000 # 触摸检测阈值,单位微秒
|
|
|
+
|
|
|
+ # 等待输入变为高电平或超时
|
|
|
+ while GPIO.input(self.GPIO_TOUCH_IN) == GPIO.LOW and discharge_time < threshold * 2:
|
|
|
+ discharge_time = (time.time_ns() - start_time) // 1000 # 转换为微秒
|
|
|
+
|
|
|
+ # 3. 恢复初始状态
|
|
|
+ GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW)
|
|
|
+
|
|
|
+ # 4. 判断是否触摸
|
|
|
+ is_touched = discharge_time > threshold
|
|
|
+
|
|
|
+ if is_touched:
|
|
|
+ print(f" 检测到有效触摸! 放电时间: {discharge_time}μs")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # 添加调试信息
|
|
|
+ print(f" 触摸检测 - 放电时间: {discharge_time}μs (阈值: {threshold}μs)")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 触摸检测异常: {e}")
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ def turn_screen_on(self):
|
|
|
+ """树莓派4B屏幕点亮优化(增加时序等待)"""
|
|
|
+ if not self.display or not hasattr(self.display, 'command'):
|
|
|
+ return
|
|
|
+
|
|
|
+ if not self.screen_on:
|
|
|
+ print("💡 尝试点亮屏幕...")
|
|
|
+ # 发送点亮命令
|
|
|
+ self.display.command(self.display.DISPLAYON)
|
|
|
+ # 树莓派4B需要等待OLED响应
|
|
|
+ time.sleep(0.2)
|
|
|
+ # 强制刷新显示内容
|
|
|
+ system_info = self.get_system_info()
|
|
|
+ if system_info:
|
|
|
+ self.draw_main_screen(system_info)
|
|
|
+ self.screen_on = True
|
|
|
+ print(" 屏幕已点亮")
|
|
|
+
|
|
|
+ def turn_screen_off(self):
|
|
|
+ """关闭屏幕"""
|
|
|
+ if not self.display or not hasattr(self.display, 'command'):
|
|
|
+ return
|
|
|
+
|
|
|
+ if self.screen_on:
|
|
|
+ self.display.command(self.display.DISPLAYOFF)
|
|
|
+ self.screen_on = False
|
|
|
+ print(" 屏幕已关闭")
|
|
|
+
|
|
|
+ def _init_display(self):
|
|
|
+ """初始化显示设备"""
|
|
|
+ print(" 初始化显示设备...")
|
|
|
+
|
|
|
+ # 尝试树莓派常用I2C地址
|
|
|
+ addresses = [0x3C, 0x3D]
|
|
|
+
|
|
|
+ for addr in addresses:
|
|
|
+ print(f"尝试I2C地址: 0x{addr:02X}")
|
|
|
+ try:
|
|
|
+ self.display = SSD1306Final(address=addr)
|
|
|
+ if self.display:
|
|
|
+ print(f"✅ 显示设备初始化成功 @ 0x{addr:02X}")
|
|
|
+ return
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 地址0x{addr:02X}初始化失败: {e}")
|
|
|
+
|
|
|
+ print(" 创建模拟显示")
|
|
|
+ self._create_mock_display()
|
|
|
+
|
|
|
+ def _create_mock_display(self):
|
|
|
+ """创建模拟显示对象"""
|
|
|
+ class MockDisplay:
|
|
|
+ def __init__(self):
|
|
|
+ self.draw = None
|
|
|
+ self.fonts = {}
|
|
|
+
|
|
|
+ class MockDraw:
|
|
|
+ def text(self, *args, **kwargs):
|
|
|
+ print(f" [模拟] 文本: {args}")
|
|
|
+ def rectangle(self, *args, **kwargs):
|
|
|
+ print(f" [模拟] 矩形: {args}")
|
|
|
+
|
|
|
+ self.draw = MockDraw()
|
|
|
+ self.command = lambda x: None
|
|
|
+
|
|
|
+ self.display = MockDisplay()
|
|
|
+ print(" 模拟显示对象创建成功")
|
|
|
+
|
|
|
+ def _show_startup_screen(self):
|
|
|
+ """显示启动屏幕"""
|
|
|
+ if not self.display or not self.display.draw:
|
|
|
+ print(" 无法显示启动屏幕")
|
|
|
+ return
|
|
|
+
|
|
|
+ print(" 显示启动屏幕...")
|
|
|
+ try:
|
|
|
+ self.display.clear()
|
|
|
+ self.display.draw_text_centered(10, "系统监控器", 'large')
|
|
|
+ self.display.draw_text_centered(35, "树莓派4B专用", 'small')
|
|
|
+ self.display.display()
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 启动屏幕显示失败: {e}")
|
|
|
+
|
|
|
+ def get_system_info(self):
|
|
|
+ """获取系统信息"""
|
|
|
+ try:
|
|
|
+ cpu_percent = psutil.cpu_percent(interval=0)
|
|
|
+ memory = psutil.virtual_memory()
|
|
|
+ disk = psutil.disk_usage('/')
|
|
|
+ temp = get_cpu_temperature()
|
|
|
+ system_uptime = time.time() - psutil.boot_time()
|
|
|
+ monitor_uptime = time.time() - self.startup_time
|
|
|
+ ip_address = get_ip_address()
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'cpu_percent': cpu_percent,
|
|
|
+ 'cpu_temp': temp,
|
|
|
+ 'mem_used': memory.used,
|
|
|
+ 'mem_total': memory.total,
|
|
|
+ 'mem_percent': memory.percent,
|
|
|
+ 'disk_used': disk.used,
|
|
|
+ 'disk_total': disk.total,
|
|
|
+ 'disk_percent': disk.percent,
|
|
|
+ 'system_uptime': system_uptime,
|
|
|
+ 'monitor_uptime': monitor_uptime,
|
|
|
+ 'datetime': datetime.now(),
|
|
|
+ 'ip_address': ip_address
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠️ 获取系统信息失败: {e}")
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def draw_main_screen(self, system_info):
|
|
|
+ """绘制主屏幕"""
|
|
|
+ if not self.display or not self.display.draw:
|
|
|
+ print(" 无法绘制界面")
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.display.clear()
|
|
|
+
|
|
|
+ # CPU信息
|
|
|
+ y_offset = 0
|
|
|
+ if 'cpu_percent' in system_info and 'cpu_temp' in system_info:
|
|
|
+ cpu_text = f"CPU: {system_info['cpu_percent']:2.0f}% 温度: {system_info['cpu_temp']:3.0f}°C"
|
|
|
+ self.display.draw_text(2, y_offset, cpu_text, 'small')
|
|
|
+
|
|
|
+ # 内存信息
|
|
|
+ y_offset += 12
|
|
|
+ if 'mem_used' in system_info and 'mem_total' in system_info:
|
|
|
+ ram_used = format_bytes_short(system_info['mem_used'])
|
|
|
+ ram_total = format_bytes_short(system_info['mem_total'])
|
|
|
+ ram_text = f"内存: {ram_used}/{ram_total} {system_info['mem_percent']:2.0f}%"
|
|
|
+ self.display.draw_text(2, y_offset, ram_text, 'small')
|
|
|
+
|
|
|
+ # 磁盘信息
|
|
|
+ y_offset += 14
|
|
|
+ if 'disk_used' in system_info and 'disk_total' in system_info:
|
|
|
+ disk_used = format_bytes_short(system_info['disk_used'])
|
|
|
+ disk_total = format_bytes_short(system_info['disk_total'])
|
|
|
+ disk_text = f"磁盘: {disk_used}/{disk_total} {system_info['disk_percent']:2.0f}%"
|
|
|
+ self.display.draw_text(2, y_offset, disk_text, 'small')
|
|
|
+
|
|
|
+ # 系统运行时间
|
|
|
+ y_offset += 14
|
|
|
+ if 'system_uptime' in system_info:
|
|
|
+ uptime_str = format_time_long(system_info['system_uptime'])
|
|
|
+ uptime_text = f"运行: {uptime_str}"
|
|
|
+ self.display.draw_text(2, y_offset, uptime_text, 'small')
|
|
|
+
|
|
|
+ # 底部状态栏
|
|
|
+ y_offset += 11
|
|
|
+ if y_offset + 11 <= 64:
|
|
|
+ if 'datetime' in system_info:
|
|
|
+ time_str = system_info['datetime'].strftime("%H:%M")
|
|
|
+ self.display.draw_text(95, y_offset + 1, time_str, 'small', 1)
|
|
|
+
|
|
|
+ if 'ip_address' in system_info:
|
|
|
+ ip_text = f"IP:{system_info['ip_address'][:12]}"
|
|
|
+ self.display.draw_text(2, y_offset + 1, ip_text, 'small', 1)
|
|
|
+
|
|
|
+ self.display.display()
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 界面绘制失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ """运行监控器"""
|
|
|
+ print(" 开始监控系统...")
|
|
|
+ print("按 Ctrl+C 退出")
|
|
|
+ print(" 触摸金属片点亮屏幕")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ try:
|
|
|
+ update_counter = 0
|
|
|
+ loop_count = 0
|
|
|
+ while True:
|
|
|
+ loop_count += 1
|
|
|
+ # 1. 检测触摸
|
|
|
+ touch_detected = self.check_touch()
|
|
|
+
|
|
|
+ # 调试用:每隔10次循环显示一次GPIO状态
|
|
|
+ if self.touch_available and loop_count % 10 == 0:
|
|
|
+ out_level = GPIO.input(self.GPIO_TOUCH_OUT)
|
|
|
+ in_level = GPIO.input(self.GPIO_TOUCH_IN)
|
|
|
+ print(f"[调试] GPIO状态 - OUT({self.GPIO_TOUCH_OUT}): {out_level}, IN({self.GPIO_TOUCH_IN}): {in_level}")
|
|
|
+
|
|
|
+ if touch_detected:
|
|
|
+ print(f" 触摸事件触发 - 时间: {datetime.now().strftime('%H:%M:%S')}")
|
|
|
+ self.last_touch_time = time.time()
|
|
|
+ self.turn_screen_on()
|
|
|
+
|
|
|
+ # 2. 自动熄屏检查
|
|
|
+ current_time = time.time()
|
|
|
+ if self.screen_on and (current_time - self.last_touch_time) > self.screen_timeout:
|
|
|
+ print(f" 自动熄屏 - 超时时间: {self.screen_timeout}秒")
|
|
|
+ self.turn_screen_off()
|
|
|
+
|
|
|
+ # 3. 屏幕开启时更新显示
|
|
|
+ if self.screen_on:
|
|
|
+ system_info = self.get_system_info()
|
|
|
+ if system_info:
|
|
|
+ draw_success = self.draw_main_screen(system_info)
|
|
|
+
|
|
|
+ update_counter += 1
|
|
|
+ if update_counter % 5 == 0:
|
|
|
+ self._print_debug_info(system_info, draw_success)
|
|
|
+ else:
|
|
|
+ # 屏幕关闭时也显示简要信息
|
|
|
+ if loop_count % 50 == 0: # 每50次循环显示一次
|
|
|
+ print(f"💤 屏幕关闭中... 无操作时间: {current_time - self.last_touch_time:.1f}秒")
|
|
|
+
|
|
|
+ # 4. 控制循环频率
|
|
|
+ time.sleep(0.1) # 提高触摸检测频率(从0.3秒改为0.1秒)
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ print("\n 程序被用户中断")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"\n 运行时错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ finally:
|
|
|
+ self._cleanup()
|
|
|
+
|
|
|
+ def _print_debug_info(self, system_info, draw_success):
|
|
|
+ """打印调试信息"""
|
|
|
+ status = "" if draw_success else "❌"
|
|
|
+ screen_status = "亮" if self.screen_on else "熄"
|
|
|
+ print(f"[{datetime.now().strftime('%H:%M:%S')}] {status} 屏{screen_status} | "
|
|
|
+ f"CPU:{system_info.get('cpu_percent', 0):2.0f}%/{system_info.get('cpu_temp', 0):3.0f}°C | "
|
|
|
+ f"内存:{system_info.get('mem_percent', 0):2.0f}% | "
|
|
|
+ f"磁盘:{system_info.get('disk_percent', 0):2.0f}%")
|
|
|
+
|
|
|
+ def _cleanup(self):
|
|
|
+ """清理资源"""
|
|
|
+ print("\n 清理资源...")
|
|
|
+
|
|
|
+ # 清理GPIO
|
|
|
+ try:
|
|
|
+ if self.touch_available:
|
|
|
+ GPIO.cleanup([self.GPIO_TOUCH_OUT, self.GPIO_TOUCH_IN])
|
|
|
+ print(" GPIO资源清理完成")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" GPIO清理失败: {e}")
|
|
|
+
|
|
|
+ # 清理显示
|
|
|
+ try:
|
|
|
+ if self.display and hasattr(self.display, 'clear'):
|
|
|
+ self.display.clear()
|
|
|
+ self.display.display()
|
|
|
+ if hasattr(self.display, 'command'):
|
|
|
+ self.display.command(self.display.DISPLAYOFF)
|
|
|
+ print(" 显示清理完成")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 显示清理失败: {e}")
|
|
|
+
|
|
|
+ print(" 程序退出")
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主函数"""
|
|
|
+ try:
|
|
|
+ # 检查树莓派4B环境
|
|
|
+ import platform
|
|
|
+ print(f"设备信息: {platform.uname()}")
|
|
|
+
|
|
|
+ monitor = ChineseSystemMonitorFinal()
|
|
|
+ monitor.run()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"💥 程序启动失败: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|