小智AI MCP视觉交互2.0(超低延迟低至1s,通过外部MCP实现,与固件做解耦,小智帮我找钥匙)
初次体验小智AI MCP视觉交互功能,跟着博主的教程进行复刻,一些过程错误记录:目前已经在完美运行中...AI小智MCP摄像头该方案使用外置摄像头实现,比较理想的场景是智能家庭监控,比如家里有五个摄像头,可以让小智帮忙找宠物,“你好小智,我家帮我找下猫。博主原教程:demo环境:mac系统登陆阿里云官网,https://bailian.console.aliyun.com/?tab=model#/
初次体验小智AI MCP视觉交互功能,跟着博主的教程进行复刻,一些过程错误记录:
目前已经在完美运行中...
AI小智MCP摄像头
小智AI MCP外置视觉系统重磅升级2.0所有设备0成本0改造接入摄像头视觉系统硬件平权,代码开源!人形机器人?语音小盒子?通通给我接入AI小智MCP服务!_哔哩哔哩_bilibili
该方案使用外置摄像头实现,比较理想的场景是智能家庭监控,比如家里有五个摄像头,可以让小智帮忙找宠物,“你好小智,我家帮我找下猫。”
博主原教程:
demo环境:
mac系统
Python 3.12.10
登陆阿里云官网,https://bailian.console.aliyun.com/?tab=model#/api-key获取自己的api-key(有大量免费额度可用)
注意:pip install -r requirement.txt后如果还出现 no modole name ‘xxx’的报错,请使用pip install xxx一个个安装完成。
export MCP_ENDPOINT="你的MCP接入点地址"
export DASHSCOPE_API_KEY="填你的api-key"
pip install -r requirement.txt
python mcp_pipe,py myVL.py
注意:
1. 博主使用的mac系统,export是将这两个参数设置为系统环境变量;
2. api-key的背后是阿里云的视觉大模型,它是能进行视觉类场景交互的核心大脑;
一、源代码测试
1.1 环境准备工作
搞环境又搞了很久,这里就不展开了,细致分析都能找到问题
1.2 mcp服务连接问题
环境搞好后,连接mcp服务又卡了很久。核心问题点在与:
#源码
async def connect_to_server(uri):
"""Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
global reconnect_attempt, backoff
try:
logger.info(f"Connecting to WebSocket server...")
async with websockets.connect(uri) as websocket:
logger.info(f"Successfully connected to WebSocket server")
# Reset reconnection counter if connection closes normally
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# Start mcp_script process
process = subprocess.Popen(
['python', mcp_script], #'python'这里是问题的关键,在我的环境下必须等知名python.exe的绝对路径,否则就是会报错
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True # Use text mode
)
logger.info(f"Started {mcp_script} process")
# Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed: {e}")
raise # Re-throw exception to trigger reconnection
except Exception as e:
logger.error(f"Connection error: {e}")
raise # Re-throw exception
finally:
# Ensure the child process is properly terminated
if 'process' in locals():
logger.info(f"Terminating {mcp_script} process")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"{mcp_script} process terminated")
修改段如下:
async def connect_to_server(uri):
"""Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
global reconnect_attempt, backoff
try:
logger.info(f"Connecting to WebSocket server...")
async with websockets.connect(uri) as websocket:
-------------------------------------------------------------------------------
# Start mcp_script process
#所运行的python路径
venv_python = r"D:\github\TenenglaTech-VL-MCP\TenenglaTech-VL-MCP\venv\Scripts\python.exe"
process = subprocess.Popen(
[venv_python, mcp_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # Use text mode
bufsize=1,
encoding='utf-8',
errors='replace',
env={**os.environ, 'PYTHONIOENCODING': 'utf-8'}
)
logger.info(f"Started {mcp_script} process")
修改好后成功运行,且可以调用mcp工具。 不过小智AI调用工具时,一直会超时。
1.3 调用超时问题
原有的逻辑是,当小智收到语音指令后,才去通过MCP服务打开摄像头,而摄像机每次打开就要很久,所以在前端上就是用户一直等待;我们需要修改逻辑,让摄像头提前预热打开,并保持常开。当小智收到指令后,截取当前帧作用图像输入,这样整条逻辑就通顺了。修改的代码如下:
# 摄像头管理器 - 保持摄像头常开
class CameraManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cap = None
cls._instance._initialized = False
return cls._instance
def initialize(self):
"""初始化摄像头(线程安全)"""
if self._initialized:
return
try:
if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
self._cap = cv2.VideoCapture(0)
if not self._cap.isOpened():
logger.error("摄像头访问被拒绝,请检查系统权限")
return
# 优化摄像头参数
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self._cap.set(cv2.CAP_PROP_FPS, 30)
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self._initialized = True
logger.info("摄像头初始化完成")
except Exception as e:
logger.error(f"摄像头初始化失败: {str(e)}")
def get_frame(self) -> Optional[bytes]:
"""获取当前帧"""
if not self._initialized or self._cap is None or not self._cap.isOpened():
return None
try:
# 丢弃缓冲区中的旧帧
for _ in range(2):
self._cap.grab()
ret, frame = self._cap.read()
if not ret:
return None
# 高效压缩
frame = cv2.resize(frame, (320, 240))
_, buffer = cv2.imencode('.jpg', frame, [
cv2.IMWRITE_JPEG_QUALITY, 50
])
return buffer
except Exception as e:
logger.error(f"获取帧异常: {str(e)}")
return None
def release(self):
"""释放摄像头资源"""
if self._cap and self._cap.isOpened():
self._cap.release()
logger.info("摄像头资源已释放")
self._initialized = False
二、增加摄像头预览功能
原始代码中可以让小智调用本地摄像头,但是我们作为用户自己却看不到摄像头里有什么,后续在使用时一旦小智反馈错误,我们将无法判断是小智的问题还是图像的问题。让使用者能实时看到摄像头里有什么东西,十分必要。
class CameraManager:
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cap = None
cls._instance._initialized = False
cls._instance._preview_thread = None
cls._instance._preview_active = False #增加预览
cls._instance._preview_lock = threading.Lock()
cls._instance._preview_window_name = "Camera Preview"
return cls._instance
#新增启动、预览循环和结束三个函数
def start_preview(self):
"""启动预览窗口"""
with self._preview_lock:
if self._preview_active:
logger.info("预览窗口已打开")
return
self._preview_active = True
self._preview_thread = threading.Thread(
target=self._preview_loop,
name="PreviewThread",
daemon=True
)
self._preview_thread.start()
logger.info("启动摄像头预览")
def _preview_loop(self):
"""预览窗口主循环"""
if platform.system() == 'Windows':
display_name = self._preview_window_name.encode('gbk').decode('latin-1')
else:
display_name = self._preview_window_name
try:
cv2.namedWindow(display_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(display_name, 640, 480)
while self._preview_active:
if not self._initialized or self._cap is None or not self._cap.isOpened():
time.sleep(0.1)
continue
# 获取最新帧
for _ in range(2): # 清空缓冲区
self._cap.grab()
ret, frame = self._cap.read()
if not ret:
time.sleep(0.1)
continue
# 显示帧
cv2.imshow(display_name, frame)
# 检查ESC键或窗口关闭
if cv2.waitKey(1) == 27 or cv2.getWindowProperty(display_name, cv2.WND_PROP_VISIBLE) < 1:
self.stop_preview()
break
except Exception as e:
logger.error(f"预览错误: {str(e)}")
finally:
try:
cv2.destroyWindow(display_name)
except:
pass
logger.info("预览窗口已关闭")
def stop_preview(self):
"""停止预览"""
with self._preview_lock:
if not self._preview_active:
return
self._preview_active = False
if self._preview_thread and self._preview_thread.is_alive():
self._preview_thread.join(timeout=1.0)
logger.info("预览已停止")
三、 添加监控画面
我买了一个萤石CP1智能云台摄像机(家用监控),原始代码里,摄像头是通过cv2.VideoCapture(0)来访问的。这里的参数0表示默认摄像头(通常是笔记本电脑内置摄像头)。如果要使用其他摄像头(如USB外接摄像头或网络监控摄像头),需要调整这个参数或使用摄像头的RTSP流地址。
class CameraManager:
#修改initialize函数,增加RTSP协议访问
def initialize(self):
if self._initialized:
return
try:
if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
# 尝试顺序:RTSP流 > 本地摄像头
sources = []
# 1. 优先尝试RTSP流
rtsp_url = os.getenv("CAMERA_RTSP_URL")
if rtsp_url:
sources.append(("RTSP流", rtsp_url))
# 2. 添加本地摄像头作为备选
sources.append(("本地摄像头", 0))
# 按顺序尝试所有来源
for source_name, source in sources:
try:
logger.info(f"尝试连接: {source_name}")
self._cap = cv2.VideoCapture(source)
# 设置超时参数(仅对部分后端有效)
self._cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) # 5秒超时
# 测试是否能读取帧
if self._cap.isOpened():
# 快速测试读取一帧
for _ in range(5): # 清空缓冲区
self._cap.grab()
ret, _ = self._cap.read()
if ret:
logger.info(f"{source_name}连接成功")
break
except Exception as e:
logger.warning(f"{source_name}连接异常: {str(e)}")
if not self._cap or not self._cap.isOpened():
logger.error("所有摄像头来源均失败")
return
# 设置摄像头参数
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self._initialized = True
logger.info("摄像头初始化完成")
# 自动启动预览
self.start_preview()
except Exception as e:
logger.error(f"摄像头初始化失败: {str(e)}")
注意:RTSP流只在局域网内可用,如果当前运行代码的设备在外部网络,则需要利用API获取实时视频流。考虑到远程监控互动的必要性,这部分的修改放在下一篇文章里展开。
更多推荐
所有评论(0)