#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ OLED系统监控器 - 树莓派4B专用版 功能:触摸唤醒(GPIO18->1M电阻->金属片->GPIO24)+ 30秒自动熄屏 """ import time import socket import psutil from datetime import datetime import RPi.GPIO as GPIO import sys # 全局变量 DRAW_OBJECT = None class I2CDevice: def __init__(self, address, busnum=1): import smbus2 # 树莓派4B推荐使用smbus2 self._address = address self._bus = smbus2.SMBus(busnum) def write8(self, register, value): self._bus.write_byte_data(self._address, register, value) def writeList(self, register, data): self._bus.write_i2c_block_data(self._address, register, data) class SSD1306Final: """SSD1306驱动(树莓派4B优化版)""" # SSD1306寄存器地址 SETCONTRAST = 0x81 DISPLAYALLON_RESUME = 0xA4 NORMALDISPLAY = 0xA6 DISPLAYOFF = 0xAE DISPLAYON = 0xAF SETDISPLAYOFFSET = 0xD3 SETCOMPINS = 0xDA SETVCOMDETECT = 0xDB SETDISPLAYCLOCKDIV = 0xD5 SETPRECHARGE = 0xD9 SETMULTIPLEX = 0xA8 SETSTARTLINE = 0x40 MEMORYMODE = 0x20 COLUMNADDR = 0x21 PAGEADDR = 0x22 COMSCANDEC = 0xC8 SEGREMAP = 0xA0 CHARGEPUMP = 0x8D def __init__(self, width=128, height=64, address=0x3C): global DRAW_OBJECT print("=" * 60) print("SSD1306Final 初始化开始...") print(f"设备: 树莓派4B专用优化") print(f"分辨率: {width}x{height}") print(f"I2C地址: 0x{address:02X}") print("=" * 60) self.width = width self.height = height self.address = address self._device = None self.buffer = [0] * (self.width * self.height // 8) self.image = None self.fonts = None # 1. 检查PIL库 self._check_pil() # 2. 初始化draw对象 self._force_init_draw() # 3. 初始化I2C设备(树莓派4B优化) self._init_i2c(address) # 4. 初始化SSD1306硬件 if self._device: self._initialize_hardware() # 5. 加载字体 self._load_fonts() print("SSD1306Final 初始化完成!") print(f"draw对象状态: {'✅ 可用' if self.draw else '❌ 不可用'}") print(f"I2C设备状态: {'✅ 连接' if self._device else '❌ 未连接'}") print("=" * 60) def _check_pil(self): """检查PIL库""" global DRAW_OBJECT try: from PIL import Image, ImageDraw, ImageFont self.pil_available = True self.Image = Image self.ImageDraw = ImageDraw self.ImageFont = ImageFont print(" PIL库检查通过") except ImportError as e: self.pil_available = False print(f" PIL库导入失败: {e}") print(" 正在尝试安装Pillow库...") import subprocess try: subprocess.check_call([sys.executable, "-m", "pip", "install", "pillow"]) from PIL import Image, ImageDraw, ImageFont self.pil_available = True self.Image = Image self.ImageDraw = ImageDraw self.ImageFont = ImageFont print(" Pillow库安装成功") except Exception as e2: print(f" Pillow库安装失败: {e2}") def _force_init_draw(self): """强制初始化draw对象""" global DRAW_OBJECT print(" 初始化draw对象...") if self.pil_available and hasattr(self, 'Image') and hasattr(self, 'ImageDraw'): try: self.image = self.Image.new('1', (self.width, self.height)) self.draw = self.ImageDraw.Draw(self.image) DRAW_OBJECT = self.draw print(" draw对象创建成功") except Exception as e: print(f" PIL创建draw失败: {e}") self._create_fallback_draw() else: print(" 创建备用draw对象") self._create_fallback_draw() if not hasattr(self, 'draw') or self.draw is None: self._create_fallback_draw() DRAW_OBJECT = self.draw def _create_fallback_draw(self): """创建备用draw对象""" class FallbackDraw: def text(self, *args, **kwargs): print(f" 文本绘制: {args}") def rectangle(self, *args, **kwargs): print(f" 矩形绘制: {args}") self.draw = FallbackDraw() print(" 使用备用draw对象") def _init_i2c(self, address): """初始化I2C设备(树莓派4B优化)""" print(f"🔌 初始化I2C设备 @ 0x{address:02X}...") try: self._device = I2CDevice(address) print(f" I2C设备连接成功") except Exception as e: self._device = None print(f" I2C设备连接失败: {e}") print(" 请检查I2C是否启用: sudo raspi-config -> 启用I2C") def _initialize_hardware(self): """初始化SSD1306硬件""" if not self._device: print(" 跳过硬件初始化") return print(" 初始化SSD1306硬件...") try: init_commands = [ self.DISPLAYOFF, self.SETDISPLAYCLOCKDIV, 0x80, self.SETMULTIPLEX, 0x3F, self.SETDISPLAYOFFSET, 0x00, self.SETSTARTLINE | 0x00, self.CHARGEPUMP, 0x14, self.MEMORYMODE, 0x00, self.SEGREMAP | 0x01, self.COMSCANDEC, self.SETCOMPINS, 0x12, self.SETCONTRAST, 0xFF, self.SETPRECHARGE, 0xF1, self.SETVCOMDETECT, 0x40, self.DISPLAYALLON_RESUME, self.NORMALDISPLAY, self.DISPLAYON ] for cmd in init_commands: self.command(cmd) time.sleep(0.01) # 树莓派4B增加命令间隔 self.clear() self.display() print(" SSD1306硬件初始化成功") except Exception as e: print(f" SSD1306硬件初始化失败: {e}") def _load_fonts(self): """加载字体""" self.fonts = {} if not self.pil_available or not hasattr(self, 'ImageFont'): print(" 无法加载字体") return print(" 加载字体...") # 中文字体路径(树莓派常用路径) font_paths = [ '/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc', '/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', '/usr/share/fonts/truetype/freefont/FreeSans.ttf' ] font_path = None for path in font_paths: try: self.ImageFont.truetype(path, 12) font_path = path break except Exception as e: print(f" 字体 {path} 不可用: {e}") if font_path: print(f" 找到字体: {font_path}") try: self.fonts['small'] = self.ImageFont.truetype(font_path, 12) self.fonts['medium'] = self.ImageFont.truetype(font_path, 14) self.fonts['large'] = self.ImageFont.truetype(font_path, 18) print(" 字体加载完成") except Exception as e: print(f" 字体加载失败: {e}") self._load_default_fonts() else: print(" 使用默认字体") self._load_default_fonts() def _load_default_fonts(self): """加载默认字体""" if not self.pil_available or not hasattr(self, 'ImageFont'): return try: self.fonts['small'] = self.ImageFont.load_default(size=10) self.fonts['medium'] = self.ImageFont.load_default(size=12) self.fonts['large'] = self.ImageFont.load_default(size=14) print(" 默认字体加载完成") except Exception as e: print(f" 默认字体加载失败: {e}") def command(self, cmd): """发送命令""" if self._device: try: self._device.write8(0x00, cmd) except Exception as e: print(f" 命令发送失败: {e}") def clear(self): """清空显示""" self.buffer = [0] * (self.width * self.height // 8) if self.draw: try: self.draw.rectangle((0, 0, self.width, self.height), fill=0) except Exception as e: print(f" 清屏失败: {e}") def display(self): """更新显示""" if not self._device: print(" I2C设备未连接") return try: self.command(self.COLUMNADDR) self.command(0) self.command(self.width - 1) self.command(self.PAGEADDR) self.command(0) self.command(self.height // 8 - 1) if self.image: for y in range(self.height): for x in range(self.width): if self.image.getpixel((x, y)): self.draw_pixel(x, y, True) for i in range(0, len(self.buffer), 16): chunk = self.buffer[i:i+16] self._device.writeList(0x40, chunk) print(" 显示更新成功") except Exception as e: print(f" 显示更新失败: {e}") def draw_pixel(self, x, y, color=True): """绘制像素""" if x < 0 or x >= self.width or y < 0 or y >= self.height: return page = y // 8 bit_in_page = y % 8 buffer_index = page * self.width + x if color: self.buffer[buffer_index] |= (1 << bit_in_page) else: self.buffer[buffer_index] &= ~(1 << bit_in_page) def draw_text(self, x, y, text, font_size='medium', color=1): """绘制文本""" if not self.draw: print(f" 无法绘制文本: {text}") return if not self.fonts or font_size not in self.fonts: print(f" 使用默认字体绘制: {text}") if self.pil_available and hasattr(self, 'ImageFont'): font = self.ImageFont.load_default(size=12) else: return font = self.fonts.get(font_size, self.fonts.get('medium', None)) if not font: print(f" 无法绘制文本: {text}") return try: color = 1 if color != 0 else 0 self.draw.text((x, y), text, font=font, fill=color) print(f" 文本绘制成功: '{text}'") except Exception as e: print(f" 文本绘制失败 '{text}': {e}") def draw_text_centered(self, y, text, font_size='medium', color=1): """居中绘制文本""" if not self.draw or not self.fonts or font_size not in self.fonts: self.draw_text(0, y, text, font_size, color) return font = self.fonts[font_size] try: text_width, text_height = font.getsize(text) x = (self.width - text_width) // 2 color = 1 if color != 0 else 0 self.draw.text((x, y), text, font=font, fill=color) print(f" 居中文本绘制成功: '{text}'") except Exception as e: print(f" 居中文本绘制失败 '{text}': {e}") self.draw_text(0, y, text, font_size, color) def draw_rectangle(self, x, y, width, height, fill=False, outline=True): """绘制矩形""" if not self.draw: print(f" 无法绘制矩形") return try: self.draw.rectangle((x, y, x + width - 1, y + height - 1), fill=fill, outline=outline) print(f" 矩形绘制成功") except Exception as e: print(f" 矩形绘制失败: {e}") def get_cpu_temperature(): """获取CPU温度""" try: with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f: temp = float(f.read()) / 1000.0 return temp except Exception as e: print(f" 获取温度失败: {e}") return 0.0 def get_ip_address(): """获取IP地址""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception as e: print(f" 获取IP失败: {e}") return "Unknown" def format_time_long(seconds): """格式化时间""" days = int(seconds // (3600 * 24)) hours = int((seconds % (3600 * 24)) // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if days > 0: return f"{days}天{hours}时" elif hours > 0: return f"{hours}时{minutes}分" else: return f"{minutes}分{secs}秒" def format_bytes_short(bytes_value): """格式化字节数""" if bytes_value < 1024: return f"{bytes_value}B" elif bytes_value < 1024 * 1024: return f"{bytes_value / 1024:.0f}K" elif bytes_value < 1024 * 1024 * 1024: return f"{bytes_value / (1024 * 1024):.0f}M" else: return f"{bytes_value / (1024 * 1024 * 1024):.1f}G" class ChineseSystemMonitorFinal: """树莓派4B专用系统监控器""" def __init__(self): self.display = None self.startup_time = time.time() # 触摸功能配置(树莓派4B优化) self.GPIO_TOUCH_OUT = 18 # BCM18 (物理引脚12) self.GPIO_TOUCH_IN = 24 # BCM24 (物理引脚18) self.last_touch_time = time.time() self.screen_on = True self.screen_timeout = 30 # 自动熄屏时间(秒) self.touch_available = False print("=" * 60) print(" 树莓派4B专用OLED监控器 v2.5") print(" 触摸功能: GPIO18->1M电阻->金属片->GPIO24") print("⏱ 自动熄屏: 30秒无操作") print("=" * 60) print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Python版本: {sys.version}") print("=" * 60) try: # 树莓派4B启动延时 print("⏳ 系统启动延迟...") time.sleep(3) # 初始化GPIO触摸功能 self.init_gpio() # 初始化显示 self._init_display() # 显示启动信息 self._show_startup_screen() except Exception as e: print(f" 初始化错误: {e}") import traceback traceback.print_exc() def init_gpio(self): """初始化树莓派4B GPIO(优化版)""" print(" 初始化触摸GPIO...") try: # 树莓派4B必须用BCM模式 GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) # 关闭GPIO警告 # 配置充电GPIO(输出模式) GPIO.setup(self.GPIO_TOUCH_OUT, GPIO.OUT) GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW) # 初始低电平 # 配置感应GPIO(输入模式) GPIO.setup(self.GPIO_TOUCH_IN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) # 验证初始电平(关键调试信息) print(f" GPIO{self.GPIO_TOUCH_OUT}输出电平: {GPIO.input(self.GPIO_TOUCH_OUT)} (应为0)") print(f" GPIO{self.GPIO_TOUCH_IN}初始电平: {GPIO.input(self.GPIO_TOUCH_IN)} (未触摸时可能为0或1)") self.touch_available = True print(" GPIO初始化成功") print(f" 物理引脚对应: GPIO18=12号, GPIO24=18号") except Exception as e: self.touch_available = False print(f" GPIO初始化失败: {e}") print(" 请检查是否安装RPi.GPIO: pip3 install RPi.GPIO") def check_touch(self): """树莓派4B专用触摸检测(基于RC电路充放电时间检测)""" if not self.touch_available: return False try: # 1. 放电阶段:输出低电平,让电容放电 GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW) time.sleep(0.001) # 1ms放电时间 # 2. 充电检测阶段:输出高电平,测量上升沿时间 GPIO.setup(self.GPIO_TOUCH_IN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) GPIO.output(self.GPIO_TOUCH_OUT, GPIO.HIGH) # 计算放电时间 discharge_time = 0 start_time = time.time_ns() threshold = 3000 # 触摸检测阈值,单位微秒 # 等待输入变为高电平或超时 while GPIO.input(self.GPIO_TOUCH_IN) == GPIO.LOW and discharge_time < threshold * 2: discharge_time = (time.time_ns() - start_time) // 1000 # 转换为微秒 # 3. 恢复初始状态 GPIO.output(self.GPIO_TOUCH_OUT, GPIO.LOW) # 4. 判断是否触摸 is_touched = discharge_time > threshold if is_touched: print(f" 检测到有效触摸! 放电时间: {discharge_time}μs") return True else: # 添加调试信息 print(f" 触摸检测 - 放电时间: {discharge_time}μs (阈值: {threshold}μs)") except Exception as e: print(f" 触摸检测异常: {e}") return False def turn_screen_on(self): """树莓派4B屏幕点亮优化(增加时序等待)""" if not self.display or not hasattr(self.display, 'command'): return if not self.screen_on: print("💡 尝试点亮屏幕...") # 发送点亮命令 self.display.command(self.display.DISPLAYON) # 树莓派4B需要等待OLED响应 time.sleep(0.2) # 强制刷新显示内容 system_info = self.get_system_info() if system_info: self.draw_main_screen(system_info) self.screen_on = True print(" 屏幕已点亮") def turn_screen_off(self): """关闭屏幕""" if not self.display or not hasattr(self.display, 'command'): return if self.screen_on: self.display.command(self.display.DISPLAYOFF) self.screen_on = False print(" 屏幕已关闭") def _init_display(self): """初始化显示设备""" print(" 初始化显示设备...") # 尝试树莓派常用I2C地址 addresses = [0x3C, 0x3D] for addr in addresses: print(f"尝试I2C地址: 0x{addr:02X}") try: self.display = SSD1306Final(address=addr) if self.display: print(f"✅ 显示设备初始化成功 @ 0x{addr:02X}") return except Exception as e: print(f" 地址0x{addr:02X}初始化失败: {e}") print(" 创建模拟显示") self._create_mock_display() def _create_mock_display(self): """创建模拟显示对象""" class MockDisplay: def __init__(self): self.draw = None self.fonts = {} class MockDraw: def text(self, *args, **kwargs): print(f" [模拟] 文本: {args}") def rectangle(self, *args, **kwargs): print(f" [模拟] 矩形: {args}") self.draw = MockDraw() self.command = lambda x: None self.display = MockDisplay() print(" 模拟显示对象创建成功") def _show_startup_screen(self): """显示启动屏幕""" if not self.display or not self.display.draw: print(" 无法显示启动屏幕") return print(" 显示启动屏幕...") try: self.display.clear() self.display.draw_text_centered(10, "系统监控器", 'large') self.display.draw_text_centered(35, "树莓派4B专用", 'small') self.display.display() time.sleep(2) except Exception as e: print(f" 启动屏幕显示失败: {e}") def get_system_info(self): """获取系统信息""" try: cpu_percent = psutil.cpu_percent(interval=0) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') temp = get_cpu_temperature() system_uptime = time.time() - psutil.boot_time() monitor_uptime = time.time() - self.startup_time ip_address = get_ip_address() return { 'cpu_percent': cpu_percent, 'cpu_temp': temp, 'mem_used': memory.used, 'mem_total': memory.total, 'mem_percent': memory.percent, 'disk_used': disk.used, 'disk_total': disk.total, 'disk_percent': disk.percent, 'system_uptime': system_uptime, 'monitor_uptime': monitor_uptime, 'datetime': datetime.now(), 'ip_address': ip_address } except Exception as e: print(f"⚠️ 获取系统信息失败: {e}") return {} def draw_main_screen(self, system_info): """绘制主屏幕""" if not self.display or not self.display.draw: print(" 无法绘制界面") return False try: self.display.clear() # CPU信息 y_offset = 0 if 'cpu_percent' in system_info and 'cpu_temp' in system_info: cpu_text = f"CPU: {system_info['cpu_percent']:2.0f}% 温度: {system_info['cpu_temp']:3.0f}°C" self.display.draw_text(2, y_offset, cpu_text, 'small') # 内存信息 y_offset += 12 if 'mem_used' in system_info and 'mem_total' in system_info: ram_used = format_bytes_short(system_info['mem_used']) ram_total = format_bytes_short(system_info['mem_total']) ram_text = f"内存: {ram_used}/{ram_total} {system_info['mem_percent']:2.0f}%" self.display.draw_text(2, y_offset, ram_text, 'small') # 磁盘信息 y_offset += 14 if 'disk_used' in system_info and 'disk_total' in system_info: disk_used = format_bytes_short(system_info['disk_used']) disk_total = format_bytes_short(system_info['disk_total']) disk_text = f"磁盘: {disk_used}/{disk_total} {system_info['disk_percent']:2.0f}%" self.display.draw_text(2, y_offset, disk_text, 'small') # 系统运行时间 y_offset += 14 if 'system_uptime' in system_info: uptime_str = format_time_long(system_info['system_uptime']) uptime_text = f"运行: {uptime_str}" self.display.draw_text(2, y_offset, uptime_text, 'small') # 底部状态栏 y_offset += 11 if y_offset + 11 <= 64: if 'datetime' in system_info: time_str = system_info['datetime'].strftime("%H:%M") self.display.draw_text(95, y_offset + 1, time_str, 'small', 1) if 'ip_address' in system_info: ip_text = f"IP:{system_info['ip_address'][:12]}" self.display.draw_text(2, y_offset + 1, ip_text, 'small', 1) self.display.display() return True except Exception as e: print(f" 界面绘制失败: {e}") return False def run(self): """运行监控器""" print(" 开始监控系统...") print("按 Ctrl+C 退出") print(" 触摸金属片点亮屏幕") print("=" * 60) try: update_counter = 0 loop_count = 0 while True: loop_count += 1 # 1. 检测触摸 touch_detected = self.check_touch() # 调试用:每隔10次循环显示一次GPIO状态 if self.touch_available and loop_count % 10 == 0: out_level = GPIO.input(self.GPIO_TOUCH_OUT) in_level = GPIO.input(self.GPIO_TOUCH_IN) print(f"[调试] GPIO状态 - OUT({self.GPIO_TOUCH_OUT}): {out_level}, IN({self.GPIO_TOUCH_IN}): {in_level}") if touch_detected: print(f" 触摸事件触发 - 时间: {datetime.now().strftime('%H:%M:%S')}") self.last_touch_time = time.time() self.turn_screen_on() # 2. 自动熄屏检查 current_time = time.time() if self.screen_on and (current_time - self.last_touch_time) > self.screen_timeout: print(f" 自动熄屏 - 超时时间: {self.screen_timeout}秒") self.turn_screen_off() # 3. 屏幕开启时更新显示 if self.screen_on: system_info = self.get_system_info() if system_info: draw_success = self.draw_main_screen(system_info) update_counter += 1 if update_counter % 5 == 0: self._print_debug_info(system_info, draw_success) else: # 屏幕关闭时也显示简要信息 if loop_count % 50 == 0: # 每50次循环显示一次 print(f"💤 屏幕关闭中... 无操作时间: {current_time - self.last_touch_time:.1f}秒") # 4. 控制循环频率 time.sleep(0.1) # 提高触摸检测频率(从0.3秒改为0.1秒) except KeyboardInterrupt: print("\n 程序被用户中断") except Exception as e: print(f"\n 运行时错误: {e}") import traceback traceback.print_exc() finally: self._cleanup() def _print_debug_info(self, system_info, draw_success): """打印调试信息""" status = "" if draw_success else "❌" screen_status = "亮" if self.screen_on else "熄" print(f"[{datetime.now().strftime('%H:%M:%S')}] {status} 屏{screen_status} | " f"CPU:{system_info.get('cpu_percent', 0):2.0f}%/{system_info.get('cpu_temp', 0):3.0f}°C | " f"内存:{system_info.get('mem_percent', 0):2.0f}% | " f"磁盘:{system_info.get('disk_percent', 0):2.0f}%") def _cleanup(self): """清理资源""" print("\n 清理资源...") # 清理GPIO try: if self.touch_available: GPIO.cleanup([self.GPIO_TOUCH_OUT, self.GPIO_TOUCH_IN]) print(" GPIO资源清理完成") except Exception as e: print(f" GPIO清理失败: {e}") # 清理显示 try: if self.display and hasattr(self.display, 'clear'): self.display.clear() self.display.display() if hasattr(self.display, 'command'): self.display.command(self.display.DISPLAYOFF) print(" 显示清理完成") except Exception as e: print(f" 显示清理失败: {e}") print(" 程序退出") def main(): """主函数""" try: # 检查树莓派4B环境 import platform print(f"设备信息: {platform.uname()}") monitor = ChineseSystemMonitorFinal() monitor.run() except Exception as e: print(f"💥 程序启动失败: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()