import time import threading import math from socket import * # pip3 install smbus ->PCF8591芯片 import smbus class ControlServer(threading.Thread): def __init__(self, ADDR): threading.Thread.__init__(self) self.setDaemon(True) # 连接socket self.sock = socket(AF_INET, SOCK_STREAM) self.sock.connect_ex(ADDR) # 初始化smbus 1表示scl.1 0表示scl.0 self.BUS = smbus.SMBus(1) # 寄存器地址 self.ADR = 0x48 # 电位器地址->对应模块in1,in2 self.P1 = 0x41 self.P2 = 0x42 self.P3 = 0x43 self.P4 = 0x44 # 方向值 self.CENTER = 0 # 最终方向 self.CON = 'Center' # 方向误差 self.VD = 10 def __del__(self): self.sock.close() pass def run(self): # 数模初始值 self.BUS.write_byte(self.ADR, self.P1) self.CENTER = self.BUS.read_byte(self.ADR) while True: data = self.Orientation() self.sock.sendall(data) time.sleep(0.03) # 电压值误差根据实际情况调整 def Residual(self, v): # 中间值 vs = self.CENTER # 误差值 vd = self.VD if math.fabs(v - vs) > vd: return v elif math.fabs(v - vs) < vd: return vs else: return vs # 区分方向 def Orientation(self): self.BUS.write_byte(self.ADR, self.P1) v0 = self.BUS.read_byte(self.ADR) time.sleep(0.02) self.BUS.write_byte(self.ADR, self.P2) v1 = self.BUS.read_byte(self.ADR) time.sleep(0.02) self.BUS.write_byte(self.ADR, self.P3) v2 = self.BUS.read_byte(self.ADR) time.sleep(0.02) self.BUS.write_byte(self.ADR, self.P4) v3 = self.BUS.read_byte(self.ADR) # 接入socket 发送数据 # db = '{"type":"conctrl","data":{"v0":%s,"v1":%s,"v2":"%s","v3":"%s"}}' % ( # v0, v1, v2, v3) db = '{"type":"conctrl","data":[{"name":"MOTOR","value":[%s,%s]},{"name":"SERVO","value":[%s,%s]}]}' % ( v0, v1, v2, v3) data = str.encode(db) return data