初次体验小智AI MCP视觉交互功能,跟着博主的教程进行复刻,一些过程错误记录:

目前已经在完美运行中...

AI小智MCP摄像头

小智AI MCP外置视觉系统重磅升级2.0所有设备0成本0改造接入摄像头视觉系统硬件平权,代码开源!人形机器人?语音小盒子?通通给我接入AI小智MCP服务!_哔哩哔哩_bilibili

该方案使用外置摄像头实现,比较理想的场景是智能家庭监控,比如家里有五个摄像头,可以让小智帮忙找宠物,“你好小智,我家帮我找下猫。”

博主原教程:

demo环境:

mac系统

Python 3.12.10

登陆阿里云官网,https://bailian.console.aliyun.com/?tab=model#/api-key获取自己的api-key(有大量免费额度可用)

注意:pip install -r requirement.txt后如果还出现 no modole name ‘xxx’的报错,请使用pip install xxx一个个安装完成。

export MCP_ENDPOINT="你的MCP接入点地址"
export DASHSCOPE_API_KEY="填你的api-key"

pip install -r requirement.txt

python mcp_pipe,py myVL.py

引用博主的开源教程

注意:

1. 博主使用的mac系统,export是将这两个参数设置为系统环境变量;

2. api-key的背后是阿里云的视觉大模型,它是能进行视觉类场景交互的核心大脑;

一、源代码测试

1.1 环境准备工作

搞环境又搞了很久,这里就不展开了,细致分析都能找到问题

1.2 mcp服务连接问题

环境搞好后,连接mcp服务又卡了很久。核心问题点在与:

#源码
async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")
            
            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF
            
            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],  #'python'这里是问题的关键,在我的环境下必须等知名python.exe的绝对路径,否则就是会报错
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")
            
            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

修改段如下:

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:

-------------------------------------------------------------------------------
            # Start mcp_script process
            #所运行的python路径
            venv_python = r"D:\github\TenenglaTech-VL-MCP\TenenglaTech-VL-MCP\venv\Scripts\python.exe" 
            process = subprocess.Popen(
                [venv_python, mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,  # Use text mode
                bufsize=1,
                encoding='utf-8',
                errors='replace',
                env={**os.environ, 'PYTHONIOENCODING': 'utf-8'}
            )
            logger.info(f"Started {mcp_script} process")

修改好后成功运行,且可以调用mcp工具。 不过小智AI调用工具时,一直会超时。

1.3 调用超时问题

原有的逻辑是,当小智收到语音指令后,才去通过MCP服务打开摄像头,而摄像机每次打开就要很久,所以在前端上就是用户一直等待;我们需要修改逻辑,让摄像头提前预热打开,并保持常开。当小智收到指令后,截取当前帧作用图像输入,这样整条逻辑就通顺了。修改的代码如下:

# 摄像头管理器 - 保持摄像头常开
class CameraManager:
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._cap = None
                cls._instance._initialized = False
        return cls._instance
    
    def initialize(self):
        """初始化摄像头(线程安全)"""
        if self._initialized:
            return
            
        try:
            if platform.system() == 'Darwin':
                os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
            
            self._cap = cv2.VideoCapture(0)
            if not self._cap.isOpened():
                logger.error("摄像头访问被拒绝,请检查系统权限")
                return
                
            # 优化摄像头参数
            self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
            self._cap.set(cv2.CAP_PROP_FPS, 30)
            self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            self._initialized = True
            logger.info("摄像头初始化完成")
        except Exception as e:
            logger.error(f"摄像头初始化失败: {str(e)}")
    
    def get_frame(self) -> Optional[bytes]:
        """获取当前帧"""
        if not self._initialized or self._cap is None or not self._cap.isOpened():
            return None
        
        try:
            # 丢弃缓冲区中的旧帧
            for _ in range(2):
                self._cap.grab()
            
            ret, frame = self._cap.read()
            if not ret:
                return None
            
            # 高效压缩
            frame = cv2.resize(frame, (320, 240))
            _, buffer = cv2.imencode('.jpg', frame, [
                cv2.IMWRITE_JPEG_QUALITY, 50
            ])
            return buffer
        except Exception as e:
            logger.error(f"获取帧异常: {str(e)}")
            return None
    
    def release(self):
        """释放摄像头资源"""
        if self._cap and self._cap.isOpened():
            self._cap.release()
            logger.info("摄像头资源已释放")
            self._initialized = False

二、增加摄像头预览功能

原始代码中可以让小智调用本地摄像头,但是我们作为用户自己却看不到摄像头里有什么,后续在使用时一旦小智反馈错误,我们将无法判断是小智的问题还是图像的问题。让使用者能实时看到摄像头里有什么东西,十分必要。


class CameraManager:

     def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._cap = None
                cls._instance._initialized = False
                cls._instance._preview_thread = None
                cls._instance._preview_active = False  #增加预览
                cls._instance._preview_lock = threading.Lock()
                cls._instance._preview_window_name = "Camera Preview"
        return cls._instance
    
    #新增启动、预览循环和结束三个函数
    def start_preview(self):
        """启动预览窗口"""
        with self._preview_lock:
            if self._preview_active:
                logger.info("预览窗口已打开")
                return
                
            self._preview_active = True
            self._preview_thread = threading.Thread(
                target=self._preview_loop,
                name="PreviewThread",
                daemon=True
            )
            self._preview_thread.start()
            logger.info("启动摄像头预览")
    
    def _preview_loop(self):
        """预览窗口主循环"""
        if platform.system() == 'Windows':
            display_name = self._preview_window_name.encode('gbk').decode('latin-1')
        else:
            display_name = self._preview_window_name
            
        try:
            cv2.namedWindow(display_name, cv2.WINDOW_NORMAL)
            cv2.resizeWindow(display_name, 640, 480)
            
            while self._preview_active:
                if not self._initialized or self._cap is None or not         self._cap.isOpened():
                    time.sleep(0.1)
                    continue
                    
                # 获取最新帧
                for _ in range(2):  # 清空缓冲区
                    self._cap.grab()
                    
                ret, frame = self._cap.read()
                if not ret:
                    time.sleep(0.1)
                    continue
                
                # 显示帧
                cv2.imshow(display_name, frame)
                
                # 检查ESC键或窗口关闭
                if cv2.waitKey(1) == 27 or cv2.getWindowProperty(display_name, cv2.WND_PROP_VISIBLE) < 1:
                    self.stop_preview()
                    break
        except Exception as e:
            logger.error(f"预览错误: {str(e)}")
        finally:
            try:
                cv2.destroyWindow(display_name)
            except:
                pass
            logger.info("预览窗口已关闭")
    
    def stop_preview(self):
        """停止预览"""
        with self._preview_lock:
            if not self._preview_active:
                return
                
            self._preview_active = False
            if self._preview_thread and self._preview_thread.is_alive():
                self._preview_thread.join(timeout=1.0)
            logger.info("预览已停止")    

三、 添加监控画面

我买了一个萤石CP1智能云台摄像机(家用监控),原始代码里,摄像头是通过cv2.VideoCapture(0)来访问的。这里的参数0表示默认摄像头(通常是笔记本电脑内置摄像头)。如果要使用其他摄像头(如USB外接摄像头或网络监控摄像头),需要调整这个参数或使用摄像头的RTSP流地址。


class CameraManager:
    
    #修改initialize函数,增加RTSP协议访问
    def initialize(self):
        if self._initialized:
            return
            
        try:
            if platform.system() == 'Darwin':
                os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
            
            # 尝试顺序:RTSP流 > 本地摄像头
            sources = []
            
            # 1. 优先尝试RTSP流
            rtsp_url = os.getenv("CAMERA_RTSP_URL")
            if rtsp_url:
                sources.append(("RTSP流", rtsp_url))
            
            # 2. 添加本地摄像头作为备选
            sources.append(("本地摄像头", 0))
            
            # 按顺序尝试所有来源
            for source_name, source in sources:
                try:
                    logger.info(f"尝试连接: {source_name}")
                    self._cap = cv2.VideoCapture(source)
                    
                    # 设置超时参数(仅对部分后端有效)
                    self._cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000)  # 5秒超时
                    
                    # 测试是否能读取帧
                    if self._cap.isOpened():
                        # 快速测试读取一帧
                        for _ in range(5):  # 清空缓冲区
                            self._cap.grab()
                        ret, _ = self._cap.read()
                        if ret:
                            logger.info(f"{source_name}连接成功")
                            break
                except Exception as e:
                    logger.warning(f"{source_name}连接异常: {str(e)}")
            
            if not self._cap or not self._cap.isOpened():
                logger.error("所有摄像头来源均失败")
                return
                    
            # 设置摄像头参数
            self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
            self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            self._initialized = True
            logger.info("摄像头初始化完成")
            
            # 自动启动预览
            self.start_preview()
        except Exception as e:
            logger.error(f"摄像头初始化失败: {str(e)}")

注意:RTSP流只在局域网内可用,如果当前运行代码的设备在外部网络,则需要利用API获取实时视频流。考虑到远程监控互动的必要性,这部分的修改放在下一篇文章里展开。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐