PyQt5+Socket+DeepSeekAI智能对话:构建树莓派无线步进电机控制与监测系统
【PyQt5+Socket+DeepSeekAI智能对话:构建树莓派无线步进电机控制与监测系统】本项目旨在构建一个基于 PyQt5 的桌面操作界面,实现与树莓派5的无线通信,控制42步进电机,同时实现实时高度检测与控制。系统还引入了 DeepSeek AI 对话接口,用户可通过界面直接提问和交互,扩展智能助手功能。
·
系列文章目录
【树莓派5实战】控制步进电机全攻略:D36A驱动42步进 + ULN驱动28步进 + 超声波测距
基于Python内置socket库实现网络通信
PyQt5入门篇:手把手教你打造Python GUI应用
PyQt5进阶篇:自定义控件、多线程、数据库集成、图表和数据可视化进阶功能GUI应用实战开发
前言
本文主要用pyqt5库制作一个操作界面与树莓派5无线通信,读取树莓派传感器数据,操纵树莓派控制步进电机,与deepseek对话。
项目概述
本项目旨在构建一个基于 PyQt5 的桌面操作界面,实现与树莓派5的无线通信,控制42步进电机,同时实现实时高度检测与控制。系统还引入了 DeepSeek AI 对话接口,用户可通过界面直接提问和交互,扩展智能助手功能。
应用场景包括但不限于:
- 智能机械臂末端高度控制
- 实验室自动升降平台调节
- 教学演示系统与AI交互辅助
- 小型工厂设备人机交互终端
一、操作界面
二、系统架构图
编辑用户操作界面(PyQt5)
├── Socket通信 -> 树莓派(接收命令/返回状态)
│ ├── 控制步进电机
│ └── 读取超声波模块高度
└── DeepSeek接口 -> AI对话反馈
三、如何运行
- 安装依赖:
pip install PyQt5 requests
- 获取 DeepSeek API 密钥: 注册并获取:https://deepseek.com/
- 配置 IP 与端口: 在主界面输入树莓派的实际 IP 和端口(默认为8888)。
- 运行主程序:
python main.py
二、代码
pc端代码:
import sys
import socket
import json
import threading
import time
import requests
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLabel, QPushButton, QSlider, QSpinBox,
QGroupBox, QLineEdit, QMessageBox, QFrame, QSplitter,
QGridLayout, QProgressBar, QTextEdit, QScrollArea)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QThread
from PyQt5.QtGui import QFont, QIcon, QColor, QPalette, QTextCursor
from PyQt5.QtChart import QChart, QChartView, QLineSeries, QValueAxis, QDateTimeAxis
from PyQt5.QtCore import QDateTime, QPointF
class DeepSeekClient(QThread):
"""DeepSeek API客户端线程类"""
response_received = pyqtSignal(str)
stream_chunk_received = pyqtSignal(str)
api_error = pyqtSignal(str)
def __init__(self, api_key):
super().__init__()
self.api_key = api_key
self.api_url = "https://api.deepseek.com/v1/chat/completions"
self.prompt = ""
self.running = False
self.streaming = True # 默认使用流式响应
def set_prompt(self, prompt):
"""设置提示词"""
self.prompt = prompt
def run(self):
"""线程主循环,调用DeepSeek API"""
self.running = True
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": self.prompt}],
"stream": self.streaming
}
try:
if self.streaming:
# 流式响应处理
response = requests.post(
self.api_url,
headers=headers,
json=data,
stream=True
)
if response.status_code == 200:
full_response = ""
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith("data: "):
json_str = line_text[6:]
if json_str.strip() == "[DONE]":
break
try:
chunk_data = json.loads(json_str)
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
chunk = chunk_data["choices"][0].get("delta", {}).get("content", "")
if chunk:
full_response += chunk
self.stream_chunk_received.emit(chunk)
except json.JSONDecodeError:
pass
self.response_received.emit(full_response)
else:
self.api_error.emit(f"API错误: {response.status_code} - {response.text}")
else:
# 非流式响应处理
response = requests.post(
self.api_url,
headers=headers,
json=data
)
if response.status_code == 200:
response_data = response.json()
if "choices" in response_data and len(response_data["choices"]) > 0:
message = response_data["choices"][0]["message"]["content"]
self.response_received.emit(message)
else:
self.api_error.emit(f"API错误: {response.status_code} - {response.text}")
except Exception as e:
self.api_error.emit(f"请求失败: {str(e)}")
self.running = False
def stop(self):
"""停止线程"""
self.running = False
class SocketClient(QThread):
"""Socket客户端线程类"""
status_received = pyqtSignal(dict)
connection_error = pyqtSignal(str)
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.socket = None
self.running = False
self.connected = False
self.buffer = ""
def connect_to_server(self):
"""连接到服务器"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(5) # 设置连接超时
self.socket.connect((self.host, self.port))
self.socket.settimeout(None) # 恢复正常模式
self.connected = True
return True
except Exception as e:
self.connection_error.emit(f"连接服务器失败: {str(e)}")
self.connected = False
return False
def send_command(self, command):
"""发送命令到服务器"""
if not self.connected:
if not self.connect_to_server():
return False
try:
message = json.dumps(command) + '\n'
self.socket.send(message.encode('utf-8'))
return True
except Exception as e:
self.connection_error.emit(f"发送命令失败: {str(e)}")
self.connected = False
return False
def run(self):
"""线程主循环,接收服务器数据"""
self.running = True
while self.running:
if not self.connected:
if not self.connect_to_server():
time.sleep(2) # 连接失败,等待2秒后重试
continue
try:
data = self.socket.recv(1024).decode('utf-8')
if not data:
self.connected = False
continue
self.buffer += data
# 处理可能的多条或不完整的消息
while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1)
try:
status = json.loads(line)
self.status_received.emit(status)
except json.JSONDecodeError:
pass
except Exception as e:
self.connected = False
self.connection_error.emit(f"接收数据失败: {str(e)}")
time.sleep(1) # 出错后等待1秒
def stop(self):
"""停止线程并关闭连接"""
self.running = False
if self.socket:
try:
self.socket.close()
except:
pass
class MotorControlApp(QMainWindow):
"""电机控制应用主窗口"""
def __init__(self):
super().__init__()
self.setWindowTitle("步进电机控制系统")
self.setMinimumSize(1200, 700) # 增加窗口大小以容纳对话界面
# 设置应用样式
self.setup_style()
# 初始化UI
self.init_ui()
# 初始化Socket客户端
self.client = None
self.server_ip = "192.168.43.22" # 默认树莓派IP
self.server_port = 8888 # 默认端口
# 初始化DeepSeek客户端
self.deepseek_client = None
self.api_key = "" # 需要设置API密钥
# 状态变量
self.motor42_running = False
self.height_control_active = False
# 启动定时器,用于更新UI
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update_ui)
self.update_timer.start(500) # 每500ms更新一次UI
self.connection_start_time = 0
self.chart_data = {"timestamps": [], "height": [], "speed": []} # 存储图表数据
def setup_style(self):
"""设置应用样式"""
# 设置全局字体
font = QFont("微软雅黑", 10)
QApplication.setFont(font)
# 设置样式表
self.setStyleSheet("""
QMainWindow {
background-color: #f5f5f5;
}
QGroupBox {
font-weight: bold;
border: 1px solid #cccccc;
border-radius: 5px;
margin-top: 10px;
padding-top: 15px;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top center;
padding: 0 5px;
background-color: #f5f5f5;
}
QPushButton {
background-color: #4a86e8;
color: white;
border: none;
padding: 8px 16px;
border-radius: 4px;
font-weight: bold;
}
QPushButton:hover {
background-color: #3a76d8;
}
QPushButton:pressed {
background-color: #2a66c8;
}
QPushButton:disabled {
background-color: #cccccc;
color: #666666;
}
QSlider::groove:horizontal {
border: 1px solid #999999;
height: 8px;
background: #cccccc;
margin: 2px 0;
border-radius: 4px;
}
QSlider::handle:horizontal {
background: #4a86e8;
border: 1px solid #5c5c5c;
width: 18px;
margin: -2px 0;
border-radius: 9px;
}
QLabel {
color: #333333;
}
QLineEdit {
padding: 6px;
border: 1px solid #cccccc;
border-radius: 4px;
}
QProgressBar {
border: 1px solid #cccccc;
border-radius: 5px;
text-align: center;
}
QProgressBar::chunk {
background-color: #4a86e8;
border-radius: 5px;
}
""")
def init_ui(self):
"""初始化用户界面"""
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 主布局 - 改为水平布局,左侧为原控制界面,右侧为对话界面
main_layout = QHBoxLayout(central_widget)
main_layout.setSpacing(10)
main_layout.setContentsMargins(10, 10, 10, 10)
# 左侧控制面板
control_panel = QWidget()
control_layout = QVBoxLayout(control_panel)
control_layout.setSpacing(10)
control_layout.setContentsMargins(10, 10, 10, 10)
# 标题标签
title_label = QLabel("步进电机与高度控制系统")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 18px; font-weight: bold; margin-bottom: 5px;")
control_layout.addWidget(title_label)
# 添加水平分割线
line = QFrame()
line.setFrameShape(QFrame.HLine)
line.setFrameShadow(QFrame.Sunken)
line.setStyleSheet("background-color: #cccccc;")
control_layout.addWidget(line)
# 服务器连接部分
connection_group = QGroupBox("服务器连接")
connection_layout = QGridLayout()
connection_layout.setColumnStretch(1, 1)
self.ip_input = QLineEdit("192.168.1.100")
self.ip_input.setPlaceholderText("输入树莓派IP地址")
self.port_input = QLineEdit("8888")
self.port_input.setPlaceholderText("输入端口号")
self.port_input.setMaximumWidth(80)
self.connect_button = QPushButton("连接服务器")
self.connect_button.clicked.connect(self.toggle_connection)
connection_layout.addWidget(QLabel("IP:"), 0, 0)
connection_layout.addWidget(self.ip_input, 0, 1)
connection_layout.addWidget(QLabel("端口:"), 0, 2)
connection_layout.addWidget(self.port_input, 0, 3)
connection_layout.addWidget(self.connect_button, 0, 4)
connection_group.setLayout(connection_layout)
control_layout.addWidget(connection_group)
# 创建分割器,分为上下两部分
control_splitter = QSplitter(Qt.Vertical)
control_layout.addWidget(control_splitter, 1) # 让分割器占据更多空间
# 上部面板 - 状态显示
upper_panel = QWidget()
upper_layout = QHBoxLayout(upper_panel)
upper_layout.setContentsMargins(0, 0, 0, 0)
# 状态显示部分
status_group = QGroupBox("系统状态")
status_layout = QGridLayout()
# 连接状态指示器
self.connection_indicator = QLabel("●")
self.connection_indicator.setStyleSheet("color: red; font-size: 20px;")
self.connection_status = QLabel("未连接")
self.connection_status.setStyleSheet("font-weight: bold;")
# 电机状态指示器
self.motor42_indicator = QLabel("●")
self.motor42_indicator.setStyleSheet("color: red; font-size: 20px;")
self.motor42_status = QLabel("42电机: 停止")
# 速度显示
self.motor42_speed_label = QLabel("速度: 0 Hz")
# 高度显示
self.height_progress = QProgressBar()
self.height_progress.setRange(0, 50)
self.height_progress.setValue(0)
self.height_progress.setFormat("当前高度: %v 厘米")
self.height_progress.setTextVisible(True)
self.status_target_height_label = QLabel("目标高度: 0.0 厘米")
# 高度控制状态
self.height_control_indicator = QLabel("●")
self.height_control_indicator.setStyleSheet("color: red; font-size: 20px;")
self.height_control_status = QLabel("高度控制: 未激活")
# 添加到网格布局
status_layout.addWidget(self.connection_indicator, 0, 0)
status_layout.addWidget(self.connection_status, 0, 1)
status_layout.addWidget(self.motor42_indicator, 1, 0)
status_layout.addWidget(self.motor42_status, 1, 1)
status_layout.addWidget(QLabel(""), 2, 0)
status_layout.addWidget(self.motor42_speed_label, 2, 1)
status_layout.addWidget(QLabel("高度:"), 3, 0, 1, 2)
status_layout.addWidget(self.height_progress, 4, 0, 1, 2)
status_layout.addWidget(self.status_target_height_label, 5, 0, 1, 2)
status_layout.addWidget(self.height_control_indicator, 6, 0)
status_layout.addWidget(self.height_control_status, 6, 1)
status_group.setLayout(status_layout)
# 实时数据监控
data_group = QGroupBox("实时数据监控")
data_layout = QGridLayout()
data_layout.setColumnStretch(0, 1)
data_layout.setColumnStretch(1, 1)
# 创建数据标签
self.current_speed_label = QLabel("当前速度: 0 Hz")
self.target_speed_label = QLabel("目标速度: 0 Hz")
self.current_height_label = QLabel("当前高度: 0.0 cm")
self.target_height_label = QLabel("目标高度: 0.0 cm")
self.height_error_label = QLabel("高度误差: 0.0 cm")
self.uptime_label = QLabel("运行时间: 00:00:00")
# 设置标签样式
for label in [self.current_speed_label, self.target_speed_label,
self.current_height_label, self.target_height_label,
self.height_error_label, self.uptime_label]:
label.setStyleSheet("font-weight: bold; color: #2c3e50;")
label.setAlignment(Qt.AlignCenter)
# 添加到布局
data_layout.addWidget(QLabel("电机状态"), 0, 0, 1, 2)
data_layout.addWidget(self.current_speed_label, 1, 0)
data_layout.addWidget(self.target_speed_label, 1, 1)
data_layout.addWidget(QLabel("高度状态"), 2, 0, 1, 2)
data_layout.addWidget(self.current_height_label, 3, 0)
data_layout.addWidget(self.target_height_label, 3, 1)
data_layout.addWidget(self.height_error_label, 4, 0)
data_layout.addWidget(self.uptime_label, 4, 1)
data_group.setLayout(data_layout)
upper_layout.addWidget(status_group)
upper_layout.addWidget(data_group)
# 下部面板 - 控制部分
lower_panel = QWidget()
lower_layout = QHBoxLayout(lower_panel)
lower_layout.setContentsMargins(0, 0, 0, 0)
# 42电机控制部分
motor42_group = QGroupBox("42步进电机控制")
motor42_layout = QVBoxLayout()
# 速度控制
speed_layout = QVBoxLayout()
speed_label = QLabel("速度控制 (Hz):")
speed_label.setStyleSheet("font-weight: bold;")
speed_layout.addWidget(speed_label)
speed_slider_layout = QHBoxLayout()
self.speed_slider = QSlider(Qt.Horizontal)
self.speed_slider.setRange(100, 2000)
self.speed_slider.setValue(1000)
self.speed_slider.setTickPosition(QSlider.TicksBelow)
self.speed_slider.setTickInterval(100)
self.speed_spinbox = QSpinBox()
self.speed_spinbox.setRange(100, 2000)
self.speed_spinbox.setValue(1000)
self.speed_spinbox.setSuffix(" Hz")
# 连接滑块和数字输入框
self.speed_slider.valueChanged.connect(self.speed_spinbox.setValue)
self.speed_spinbox.valueChanged.connect(self.speed_slider.setValue)
self.speed_spinbox.valueChanged.connect(self.set_motor42_speed)
speed_slider_layout.addWidget(QLabel("100"))
speed_slider_layout.addWidget(self.speed_slider)
speed_slider_layout.addWidget(QLabel("2000"))
speed_slider_layout.addWidget(self.speed_spinbox)
speed_layout.addLayout(speed_slider_layout)
motor42_layout.addLayout(speed_layout)
# 添加分隔线
line2 = QFrame()
line2.setFrameShape(QFrame.HLine)
line2.setFrameShadow(QFrame.Sunken)
motor42_layout.addWidget(line2)
# 启动/停止按钮
button_layout = QHBoxLayout()
button_label = QLabel("电机控制:")
button_label.setStyleSheet("font-weight: bold;")
button_layout.addWidget(button_label)
self.start_button = QPushButton("启动电机")
self.start_button.setMinimumHeight(40)
self.stop_button = QPushButton("停止电机")
self.stop_button.setMinimumHeight(40)
self.stop_button.setStyleSheet("background-color: #e74c3c;")
self.start_button.clicked.connect(self.start_motor42)
self.stop_button.clicked.connect(self.stop_motor42)
button_layout.addWidget(self.start_button)
button_layout.addWidget(self.stop_button)
motor42_layout.addLayout(button_layout)
motor42_group.setLayout(motor42_layout)
# 高度控制部分
height_group = QGroupBox("高度控制")
height_layout = QVBoxLayout()
# 目标高度设置
target_layout = QVBoxLayout()
target_label = QLabel("目标高度设置 (厘米):")
target_label.setStyleSheet("font-weight: bold;")
target_layout.addWidget(target_label)
target_slider_layout = QHBoxLayout()
self.target_slider = QSlider(Qt.Horizontal)
self.target_slider.setRange(5, 50)
self.target_slider.setValue(20)
self.target_slider.setTickPosition(QSlider.TicksBelow)
self.target_slider.setTickInterval(5)
self.target_spinbox = QSpinBox()
self.target_spinbox.setRange(5, 50)
self.target_spinbox.setValue(20)
self.target_spinbox.setSuffix(" cm")
# 连接滑块和数字输入框
self.target_slider.valueChanged.connect(self.target_spinbox.setValue)
self.target_spinbox.valueChanged.connect(self.target_slider.setValue)
self.target_spinbox.valueChanged.connect(self.set_target_height)
target_slider_layout.addWidget(QLabel("5"))
target_slider_layout.addWidget(self.target_slider)
target_slider_layout.addWidget(QLabel("50"))
target_slider_layout.addWidget(self.target_spinbox)
target_layout.addLayout(target_slider_layout)
height_layout.addLayout(target_layout)
# 添加分隔线
line3 = QFrame()
line3.setFrameShape(QFrame.HLine)
line3.setFrameShadow(QFrame.Sunken)
height_layout.addWidget(line3)
# 高度控制按钮
height_button_layout = QHBoxLayout()
height_control_label = QLabel("高度控制:")
height_control_label.setStyleSheet("font-weight: bold;")
height_button_layout.addWidget(height_control_label)
self.activate_height_button = QPushButton("激活高度控制")
self.activate_height_button.setMinimumHeight(40)
self.activate_height_button.setStyleSheet("background-color: #27ae60;")
self.deactivate_height_button = QPushButton("停止高度控制")
self.deactivate_height_button.setMinimumHeight(40)
self.deactivate_height_button.setStyleSheet("background-color: #e74c3c;")
self.activate_height_button.clicked.connect(self.activate_height_control)
self.deactivate_height_button.clicked.connect(self.deactivate_height_control)
height_button_layout.addWidget(self.activate_height_button)
height_button_layout.addWidget(self.deactivate_height_button)
height_layout.addLayout(height_button_layout)
height_group.setLayout(height_layout)
lower_layout.addWidget(motor42_group)
lower_layout.addWidget(height_group)
# 图表部分
chart_group = QGroupBox("实时数据图表")
chart_layout = QVBoxLayout()
self.chart_view = self.create_realtime_chart()
chart_layout.addWidget(self.chart_view)
chart_group.setLayout(chart_layout)
# 添加各部分到分割器
control_splitter.addWidget(upper_panel)
control_splitter.addWidget(lower_panel)
control_splitter.addWidget(chart_group)
control_splitter.setSizes([200, 200, 300]) # 设置初始大小比例
# 右侧DeepSeek对话面板
chat_panel = QWidget()
chat_layout = QVBoxLayout(chat_panel)
chat_layout.setSpacing(10)
chat_layout.setContentsMargins(10, 10, 10, 10)
# DeepSeek API设置
api_group = QGroupBox("DeepSeek API设置")
api_layout = QHBoxLayout()
self.api_key_input = QLineEdit()
self.api_key_input.setPlaceholderText("输入DeepSeek API密钥")
self.api_key_input.setEchoMode(QLineEdit.Password) # 密码模式显示
self.save_api_key_button = QPushButton("保存密钥")
self.save_api_key_button.clicked.connect(self.save_api_key)
api_layout.addWidget(QLabel("API密钥:"))
api_layout.addWidget(self.api_key_input)
api_layout.addWidget(self.save_api_key_button)
api_group.setLayout(api_layout)
chat_layout.addWidget(api_group)
# 对话历史
chat_history_group = QGroupBox("对话历史")
chat_history_layout = QVBoxLayout()
self.chat_history = QTextEdit()
self.chat_history.setReadOnly(True)
self.chat_history.setStyleSheet("""
QTextEdit {
background-color: #f8f9fa;
border: 1px solid #ddd;
border-radius: 5px;
padding: 10px;
}
""")
chat_history_layout.addWidget(self.chat_history)
chat_history_group.setLayout(chat_history_layout)
chat_layout.addWidget(chat_history_group, 1) # 让对话历史占据更多空间
# 用户输入
input_group = QGroupBox("发送消息")
input_layout = QVBoxLayout()
self.user_input = QTextEdit()
self.user_input.setPlaceholderText("输入您的问题...")
self.user_input.setMaximumHeight(100)
self.user_input.setStyleSheet("""
QTextEdit {
background-color: white;
border: 1px solid #ddd;
border-radius: 5px;
padding: 10px;
}
""")
self.send_button = QPushButton("发送")
self.send_button.setMinimumHeight(40)
self.send_button.setStyleSheet("background-color: #27ae60;")
self.send_button.clicked.connect(self.send_message)
input_layout.addWidget(self.user_input)
input_layout.addWidget(self.send_button)
input_group.setLayout(input_layout)
chat_layout.addWidget(input_group)
# 设置左右面板比例
main_layout.addWidget(control_panel, 2) # 控制面板占2/3
main_layout.addWidget(chat_panel, 1) # 对话面板占1/3
# 添加DeepSeek相关方法
def save_api_key(self):
"""保存DeepSeek API密钥"""
self.api_key = self.api_key_input.text().strip()
if self.api_key:
QMessageBox.information(self, "成功", "API密钥已保存")
# 初始化DeepSeek客户端
self.deepseek_client = DeepSeekClient(self.api_key)
self.deepseek_client.response_received.connect(self.handle_complete_response)
self.deepseek_client.stream_chunk_received.connect(self.handle_stream_chunk)
self.deepseek_client.api_error.connect(self.handle_api_error)
else:
QMessageBox.warning(self, "错误", "请输入有效的API密钥")
def send_message(self):
"""发送消息到DeepSeek"""
if not hasattr(self, 'deepseek_client') or not self.deepseek_client:
QMessageBox.warning(self, "错误", "请先设置API密钥")
return
user_message = self.user_input.toPlainText().strip()
if not user_message:
return
# 添加用户消息到对话历史
self.chat_history.append(f"<b>您:</b> {user_message}")
self.chat_history.append("") # 空行
# 添加AI回复的占位符
self.chat_history.append("<b>DeepSeek:</b> ")
self.current_response_cursor = self.chat_history.textCursor()
self.current_response_cursor.movePosition(QTextCursor.End)
# 清空输入框
self.user_input.clear()
# 禁用发送按钮,防止重复发送
self.send_button.setEnabled(False)
self.send_button.setText("正在回复...")
# 发送到DeepSeek
self.deepseek_client.set_prompt(user_message)
self.deepseek_client.start()
def handle_stream_chunk(self, chunk):
"""处理流式响应的数据块"""
# 在当前位置插入文本
self.chat_history.textCursor().insertText(chunk)
# 滚动到底部
self.chat_history.verticalScrollBar().setValue(
self.chat_history.verticalScrollBar().maximum()
)
def handle_complete_response(self, response):
"""处理完整的响应"""
# 添加空行,为下一次对话做准备
self.chat_history.append("")
self.chat_history.append("")
# 恢复发送按钮
self.send_button.setEnabled(True)
self.send_button.setText("发送")
# 滚动到底部
self.chat_history.verticalScrollBar().setValue(
self.chat_history.verticalScrollBar().maximum()
)
def handle_api_error(self, error_msg):
"""处理API错误"""
self.chat_history.append(f"<span style='color: red;'><b>错误:</b> {error_msg}</span>")
self.chat_history.append("")
# 恢复发送按钮
self.send_button.setEnabled(True)
self.send_button.setText("发送")
# 其余方法保持不变
def toggle_connection(self):
"""切换连接状态"""
if self.client and self.client.connected:
self.disconnect_from_server()
else:
self.connect_to_server()
def connect_to_server(self):
"""连接到服务器"""
try:
self.server_ip = self.ip_input.text()
self.server_port = int(self.port_input.text())
# 创建并启动Socket客户端线程
self.client = SocketClient(self.server_ip, self.server_port)
self.client.status_received.connect(self.handle_status_update)
self.client.connection_error.connect(self.handle_connection_error)
self.client.start()
self.connection_start_time = time.time() # 记录连接开始时间
self.connect_button.setText("断开连接")
self.connection_status.setText("已连接")
except ValueError:
QMessageBox.warning(self, "输入错误", "请输入有效的端口号")
def disconnect_from_server(self):
"""断开服务器连接"""
if self.client:
self.client.stop()
self.client.wait()
self.client = None
self.connect_button.setText("连接")
self.connection_status.setText("未连接")
def handle_status_update(self, status):
"""处理服务器发送的状态更新"""
# 更新状态变量
self.motor42_running = status.get('motor42_running', False)
self.motor42_speed = status.get('motor42_speed', 0)
self.current_height = status.get('current_height', 0)
self.target_height = status.get('target_height', 20)
self.height_control_active = status.get('height_control_active', False)
self.update_chart()
# 更新UI将在update_ui方法中完成
def handle_connection_error(self, error_msg):
"""处理连接错误"""
self.connection_status.setText(f"连接错误")
print(error_msg) # 在控制台打印详细错误
def update_ui(self):
"""更新UI显示"""
# 更新连接状态
if hasattr(self, 'client') and self.client and self.client.connected:
self.connection_indicator.setStyleSheet("color: green; font-size: 20px;")
self.connection_status.setText("已连接")
else:
self.connection_indicator.setStyleSheet("color: red; font-size: 20px;")
self.connection_status.setText("未连接")
# 更新42电机状态
if hasattr(self, 'motor42_running'):
if self.motor42_running:
self.motor42_indicator.setStyleSheet("color: green; font-size: 20px;")
self.motor42_status.setText("42电机: 运行中")
else:
self.motor42_indicator.setStyleSheet("color: red; font-size: 20px;")
self.motor42_status.setText("42电机: 停止")
# 更新42电机速度
if hasattr(self, 'motor42_speed'):
self.motor42_speed_label.setText(f"速度: {self.motor42_speed} Hz")
# 更新高度信息
if hasattr(self, 'current_height'):
self.height_progress.setValue(int(self.current_height))
self.height_progress.setFormat(f"当前高度: {self.current_height:.1f} 厘米")
if hasattr(self, 'target_height'):
self.status_target_height_label.setText(f"目标高度: {self.target_height:.1f} 厘米")
# 更新高度控制状态
if hasattr(self, 'height_control_active'):
if self.height_control_active:
self.height_control_indicator.setStyleSheet("color: green; font-size: 20px;")
self.height_control_status.setText("高度控制: 已激活")
self.activate_height_button.setEnabled(False)
self.deactivate_height_button.setEnabled(True)
else:
self.height_control_indicator.setStyleSheet("color: red; font-size: 20px;")
self.height_control_status.setText("高度控制: 未激活")
self.activate_height_button.setEnabled(True)
self.deactivate_height_button.setEnabled(False)
if hasattr(self, 'motor42_speed'):
self.current_speed_label.setText(f"当前速度: {self.motor42_speed} Hz")
self.target_speed_label.setText(f"目标速度: {self.speed_spinbox.value()} Hz")
if hasattr(self, 'current_height') and hasattr(self, 'target_height'):
self.current_height_label.setText(f"当前高度: {self.current_height:.1f} cm")
self.target_height_label.setText(f"目标高度: {self.target_height:.1f} cm")
height_error = abs(self.current_height - self.target_height)
self.height_error_label.setText(f"高度误差: {height_error:.1f} cm")
# 更新运行时间
if self.client and self.client.connected:
uptime = time.time() - self.connection_start_time
hours = int(uptime // 3600)
minutes = int((uptime % 3600) // 60)
seconds = int(uptime % 60)
self.uptime_label.setText(f"运行时间: {hours:02d}:{minutes:02d}:{seconds:02d}")
else:
self.uptime_label.setText("运行时间: 00:00:00")
def set_motor42_speed(self, speed):
"""设置42电机速度"""
if self.client and self.client.connected:
self.client.send_command({'motor42_speed': speed})
def start_motor42(self):
"""启动42电机"""
if self.client and self.client.connected:
self.client.send_command({'motor42_start': True})
def stop_motor42(self):
"""停止42电机"""
if self.client and self.client.connected:
self.client.send_command({'motor42_stop': True})
def set_target_height(self, height):
"""设置目标高度"""
if self.client and self.client.connected:
self.client.send_command({'target_height': height})
def activate_height_control(self):
"""激活高度控制"""
if self.client and self.client.connected:
self.client.send_command({'height_control_active': True})
def deactivate_height_control(self):
"""停止高度控制"""
if self.client and self.client.connected:
self.client.send_command({'height_control_active': False})
def create_realtime_chart(self):
"""创建实时数据图表"""
# 创建图表
chart = QChart()
chart.setTitle("实时数据监控")
chart.setAnimationOptions(QChart.SeriesAnimations)
# 创建数据序列
self.height_series = QLineSeries()
self.height_series.setName("高度 (cm)")
self.speed_series = QLineSeries()
self.speed_series.setName("速度 (Hz)")
# 添加到图表
chart.addSeries(self.height_series)
chart.addSeries(self.speed_series)
# 时间轴
self.time_axis = QDateTimeAxis()
self.time_axis.setFormat("hh:mm:ss")
self.time_axis.setTitleText("时间")
chart.addAxis(self.time_axis, Qt.AlignBottom)
# 数值轴
self.value_axis = QValueAxis()
self.value_axis.setTitleText("数值")
chart.addAxis(self.value_axis, Qt.AlignLeft)
# 绑定坐标轴
self.height_series.attachAxis(self.time_axis)
self.height_series.attachAxis(self.value_axis)
self.speed_series.attachAxis(self.time_axis)
self.speed_series.attachAxis(self.value_axis)
# 创建视图
chart_view = QChartView(chart)
chart_view.setMinimumHeight(300)
return chart_view
def update_chart(self):
"""更新图表数据"""
# 保留最近60秒数据
max_points = 60
current_time = QDateTime.currentDateTime()
# 更新高度数据
if hasattr(self, 'current_height'):
self.height_series.append(current_time.toMSecsSinceEpoch(), self.current_height)
if len(self.height_series) > max_points:
self.height_series.removePoints(0, len(self.height_series) - max_points)
# 更新速度数据
if hasattr(self, 'motor42_speed'):
self.speed_series.append(current_time.toMSecsSinceEpoch(), self.motor42_speed)
if len(self.speed_series) > max_points:
self.speed_series.removePoints(0, len(self.speed_series) - max_points)
# 调整坐标轴范围
self.time_axis.setRange(
QDateTime.fromMSecsSinceEpoch(current_time.toMSecsSinceEpoch() - 60000),
current_time
)
# 确保数值轴有合适的范围
max_value = 10 # 默认最小值
if hasattr(self, 'motor42_speed') and hasattr(self, 'current_height'):
max_value = max(self.motor42_speed, self.current_height, max_value)
self.value_axis.setRange(0, max_value + 10)
def closeEvent(self, event):
"""窗口关闭事件处理"""
if self.client:
self.client.stop()
self.client.wait()
if hasattr(self, 'deepseek_client') and self.deepseek_client:
self.deepseek_client.stop()
self.deepseek_client.wait()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MotorControlApp()
window.show()
sys.exit(app.exec_())
总结
本系统融合了 PyQt5 图形界面开发、Socket 网络通信、Raspberry Pi 控制、数据可视化与 AI 接口集成,完整展示了一个面向物联网与智能应用的综合项目框架。后续可拓展功能如远程监控、模型预测、语音控制、远程Web管理等,为学习与实践提供了良好的模板。
更多推荐
所有评论(0)