ClaudeCode 工具执行代码示例
统一执行接口:所有工具都通过函数执行镜像工具模式:工具是从TypeScript源码镜像而来权限控制系统:支持基于上下文的权限检查执行状态跟踪:每个执行都返回完整的执行状态信息工具发现机制:支持按名称和路径搜索工具。
·
如果文章对你有帮助,请点个“关注”
工具执行核心代码
1. 完整的工具执行流程
# 工具执行完整流程示例
from execution_registry import build_execution_registry
from tools import execute_tool
# 1. 构建执行注册表
registry = build_execution_registry()
# 2. 查找工具
bash_tool = registry.tool("BashTool")
if bash_tool:
# 3. 执行工具
result = bash_tool.execute("ls -la")
print(f"执行结果: {result}")
else:
print("工具未找到")
2. 直接调用工具执行函数
from tools import execute_tool
# 直接执行工具
execution = execute_tool("BashTool", "echo 'Hello World'")
print(f"工具名称: {execution.name}")
print(f"源文件: {execution.source_hint}")
print(f"执行参数: {execution.payload}")
print(f"处理状态: {execution.handled}")
print(f"结果消息: {execution.message}")
3. 工具查找和过滤示例
from tools import find_tools, get_tools, filter_tools_by_permission_context
# 查找所有包含 "file" 的工具
file_tools = find_tools("file")
print(f"找到 {len(file_tools)} 个文件相关工具:")
for tool in file_tools:
print(f" - {tool.name} ({tool.source_hint})")
# 获取所有工具(简单模式)
simple_tools = get_tools(simple_mode=True)
print(f"\n简单模式工具 ({len(simple_tools)} 个):")
for tool in simple_tools:
print(f" - {tool.name}")
# 获取所有工具(排除MCP工具)
no_mcp_tools = get_tools(include_mcp=False)
print(f"\n排除MCP的工具 ({len(no_mcp_tools)} 个)")
工具执行代码详解
1. 执行注册表构建
# execution_registry.py 中的关键代码
def build_execution_registry() -> ExecutionRegistry:
"""构建执行注册表,包含所有命令和工具"""
return ExecutionRegistry(
commands=tuple(MirroredCommand(module.name, module.source_hint)
for module in PORTED_COMMANDS),
tools=tuple(MirroredTool(module.name, module.source_hint)
for module in PORTED_TOOLS),
)
2. 工具执行实现
# tools.py 中的 execute_tool 函数实现
def execute_tool(name: str, payload: str = '') -> ToolExecution:
"""执行指定工具
Args:
name: 工具名称
payload: 执行参数(JSON字符串)
Returns:
ToolExecution: 执行结果
"""
# 1. 查找工具
module = get_tool(name)
# 2. 检查工具是否存在
if module is None:
return ToolExecution(
name=name,
source_hint='',
payload=payload,
handled=False,
message=f'Unknown mirrored tool: {name}'
)
# 3. 执行工具(当前是镜像模式)
action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}."
# 4. 返回执行结果
return ToolExecution(
name=module.name,
source_hint=module.source_hint,
payload=payload,
handled=True,
message=action
)
3. 工具权限检查
# tools.py 中的权限检查函数
def filter_tools_by_permission_context(
tools: tuple[PortingModule, ...],
permission_context: ToolPermissionContext | None = None
) -> tuple[PortingModule, ...]:
"""根据权限上下文过滤工具
Args:
tools: 工具列表
permission_context: 权限上下文
Returns:
过滤后的工具列表
"""
if permission_context is None:
return tools
# 检查每个工具是否被权限上下文阻止
return tuple(
module for module in tools
if not permission_context.blocks(module.name)
)
实际使用示例
示例1:执行Bash命令
# 执行Bash命令的完整示例
from tools import execute_tool
# 执行ls命令
result = execute_tool("BashTool", json.dumps({
"command": "ls -la",
"cwd": "/home/user",
"timeout": 30
}))
if result.handled:
print(f" 命令执行成功: {result.message}")
else:
print(f" 命令执行失败: {result.message}")
示例2:文件读取操作
# 文件读取操作示例
from tools import execute_tool
import json
# 读取文件内容
file_read_result = execute_tool("FileReadTool", json.dumps({
"filepath": "/path/to/file.txt",
"encoding": "utf-8",
"limit": 1000
}))
print(f"文件读取结果: {file_read_result.message}")
示例3:代理工具使用
# 代理工具使用示例
from tools import execute_tool
import json
# 创建新代理
agent_result = execute_tool("AgentTool", json.dumps({
"action": "create",
"name": "数据分析代理",
"purpose": "进行数据分析和可视化",
"config": {
"model": "gpt-4",
"temperature": 0.7
}
}))
print(f"代理创建结果: {agent_result.message}")
调试和监控
1. 工具执行状态监控
# 监控工具执行状态
from tools import execute_tool
from dataclasses import asdict
def monitor_tool_execution(tool_name: str, payload: str):
"""监控工具执行状态"""
print(f" 开始执行工具: {tool_name}")
print(f" 执行参数: {payload}")
# 执行工具
result = execute_tool(tool_name, payload)
# 输出详细结果
print(f"\n 执行结果:")
for key, value in asdict(result).items():
print(f" {key}: {value}")
return result
# 使用监控函数
monitor_tool_execution("BashTool", "pwd")
2. 工具执行性能分析
# 工具执行性能分析
import time
from tools import execute_tool
def analyze_tool_performance(tool_name: str, payload: str, iterations: int = 10):
"""分析工具执行性能"""
times = []
for i in range(iterations):
start_time = time.time()
result = execute_tool(tool_name, payload)
end_time = time.time()
times.append(end_time - start_time)
if not result.handled:
print(f" 第 {i+1} 次执行失败: {result.message}")
break
if times:
avg_time = sum(times) / len(times)
print(f"\n 性能分析:")
print(f" 执行次数: {len(times)}")
print(f" 平均时间: {avg_time:.4f} 秒")
print(f" 最快时间: {min(times):.4f} 秒")
print(f" 最慢时间: {max(times):.4f} 秒")
return times
# 分析BashTool性能
analyze_tool_performance("BashTool", "echo 'test'", 5)
安全最佳实践
1. 权限检查
# 安全执行工具的最佳实践
from tools import execute_tool, get_tool
from permissions import ToolPermissionContext
def safe_execute_tool(tool_name: str, payload: str, user_context: dict):
"""安全执行工具(带权限检查)"""
# 1. 创建权限上下文
permission_context = ToolPermissionContext(user_context)
# 2. 检查工具是否存在
tool = get_tool(tool_name)
if not tool:
return f" 工具 '{tool_name}' 不存在"
# 3. 检查权限
if permission_context.blocks(tool_name):
return f" 用户无权限执行工具 '{tool_name}'"
# 4. 执行工具
result = execute_tool(tool_name, payload)
# 5. 记录执行日志
log_execution(tool_name, payload, result, user_context)
return result
def log_execution(tool_name: str, payload: str, result, user_context: dict):
"""记录工具执行日志"""
log_entry = {
"timestamp": time.time(),
"tool": tool_name,
"payload": payload,
"user": user_context.get("user_id"),
"handled": result.handled,
"message": result.message[:100] # 截断长消息
}
# 保存到日志系统
print(f" 执行日志: {log_entry}")
2. 参数验证
# 工具参数验证
import json
def validate_tool_payload(tool_name: str, payload: str) -> tuple[bool, str]:
"""验证工具参数"""
try:
# 1. 尝试解析JSON
params = json.loads(payload)
# 2. 根据工具类型进行验证
if tool_name == "BashTool":
if "command" not in params:
return False, "BashTool 必须包含 'command' 参数"
# 安全检查:禁止危险命令
dangerous_commands = ["rm -rf", "format", "chmod 777"]
for cmd in dangerous_commands:
if cmd in params["command"]:
return False, f"禁止执行危险命令: {cmd}"
elif tool_name == "FileReadTool":
if "filepath" not in params:
return False, "FileReadTool 必须包含 'filepath' 参数"
# 安全检查:禁止访问系统文件
system_paths = ["/etc/passwd", "/etc/shadow", "C:\\Windows"]
for path in system_paths:
if path in params["filepath"]:
return False, f"禁止访问系统文件: {path}"
return True, "参数验证通过"
except json.JSONDecodeError:
return False, "参数必须是有效的JSON格式"
总结
关键执行代码要点:
- 统一执行接口:所有工具都通过
execute_tool()函数执行 - 镜像工具模式:工具是从TypeScript源码镜像而来
- 权限控制系统:支持基于上下文的权限检查
- 执行状态跟踪:每个执行都返回完整的执行状态信息
- 工具发现机制:支持按名称和路径搜索工具
使用建议:
- 始终进行权限检查:在执行工具前检查用户权限
- 验证工具参数:确保参数格式正确且安全
- 监控执行状态:记录工具执行日志用于调试和审计
- 使用工具池:根据需要过滤工具集合
- 处理执行失败:妥善处理工具执行失败的情况
代码位置:
- 主执行代码:
src/tools.py→execute_tool() - 执行注册表:
src/execution_registry.py - 工具定义:
src/reference_data/tools_snapshot.json - 权限控制:
src/permissions.py - 工具池管理:
src/tool_pool.py
如果文章对你有帮助,请点个“关注”
更多推荐



所有评论(0)