codex app server 探索
【代码】codex app server 探索。
·



socket测试
#!/Users/luca/miniforge3/envs/py311/bin/python
# -*- coding: utf-8 -*-
"""Codex app-server WebSocket 冒烟测试。"""
from __future__ import annotations
import asyncio
import json
from typing import Any, Dict
import websockets
# =========================
# 可直接修改的测试参数(不走命令行参数)
# =========================
WS_URL = "ws://127.0.0.1:8766"
TEST_CWD = "/Users/luca/tuya/test/codex_appserver"
CLIENT_NAME = "local-ws-tester"
CLIENT_VERSION = "1.0.0"
PT_ARRAY = [
"你会哪些skill",
"我银行卡余额300元,存款100,现在余额多少",
"继续存款10000,现在余额多少"
]
TIMEOUT_SECONDS = 180
async def wait_for_message(ws, matcher, timeout: int) -> Dict[str, Any]:
"""循环读取消息,直到命中 matcher 条件或超时。"""
async with asyncio.timeout(timeout):
while True:
raw = await ws.recv()
data = json.loads(raw)
if isinstance(data, dict) and matcher(data):
return data
async def send_single_turn_request(
ws, thread_id: str, pt: str, turn_index: int
) -> str:
"""发送单轮 turn/start 请求,并返回 turn_id。"""
request_id = f"turn-{turn_index}"
await ws.send(
json.dumps(
{
"id": request_id,
"method": "turn/start",
"params": {
"threadId": thread_id,
"input": [{"type": "text", "text": pt}],
},
},
ensure_ascii=False,
)
)
turn_resp = await wait_for_message(
ws,
lambda m: m.get("id") == request_id and "result" in m,
TIMEOUT_SECONDS,
)
return turn_resp["result"]["turn"]["id"]
async def parse_single_turn_result(ws, turn_id: str) -> str:
"""解析单轮 turn 结果:按 turn_id 收集增量直到 turn/completed。"""
collected: list[str] = []
async with asyncio.timeout(TIMEOUT_SECONDS):
while True:
raw = await ws.recv()
msg = json.loads(raw)
if not isinstance(msg, dict):
continue
method = msg.get("method")
params = msg.get("params") or {}
if method == "item/agentMessage/delta" and params.get("turnId") == turn_id:
delta = params.get("delta", "")
if delta:
collected.append(delta)
if method == "turn/completed":
completed_turn_id = (params.get("turn") or {}).get("id")
if completed_turn_id == turn_id:
break
return "".join(collected)
async def run_single_turn(ws, thread_id: str, pt: str, turn_index: int) -> dict[str, str]:
"""执行单轮请求:先发送请求,再解析结果,返回 turn_id 和回复文本。"""
turn_id = await send_single_turn_request(ws, thread_id, pt, turn_index)
print(f"turn/start 已接受, turn_id={turn_id}", flush=True)
assistant_text = await parse_single_turn_result(ws, turn_id)
return {"turn_id": turn_id, "reply": assistant_text}
async def run_ws_smoke_test() -> None:
"""执行基于 WebSocket 的 app-server 冒烟测试。"""
if not PT_ARRAY:
raise ValueError("PT_ARRAY 不能为空。")
print("[WS 1/5] 连接 app-server...", flush=True)
async with websockets.connect(WS_URL, max_size=10 * 1024 * 1024) as ws:
print("[WS 2/5] 发送 initialize...", flush=True)
await ws.send(
json.dumps(
{
"id": "init-1",
"method": "initialize",
"params": {
"clientInfo": {"name": CLIENT_NAME, "version": CLIENT_VERSION}
},
},
ensure_ascii=False,
)
)
init_resp = await wait_for_message(
ws,
lambda m: m.get("id") == "init-1" and "result" in m,
TIMEOUT_SECONDS,
)
print(f"initialize 响应: {init_resp.get('result')}", flush=True)
print("[WS 3/5] 发送 initialized + thread/start...", flush=True)
await ws.send(json.dumps({"method": "initialized"}, ensure_ascii=False))
await ws.send(
json.dumps(
{
"id": "thread-1",
"method": "thread/start",
"params": {
"cwd": TEST_CWD,
"approvalPolicy": "never",
"sandbox": "workspace-write",
},
},
ensure_ascii=False,
)
)
thread_resp = await wait_for_message(
ws,
lambda m: m.get("id") == "thread-1" and "result" in m,
TIMEOUT_SECONDS,
)
thread_id = thread_resp["result"]["thread"]["id"]
print(f"thread/start 成功, thread_id={thread_id}", flush=True)
print("[WS 4/5] 循环处理 PT_ARRAY...", flush=True)
for idx, pt in enumerate(PT_ARRAY, start=1):
print(f"[TURN {idx}] 输入 PT: {pt}", flush=True)
turn_result = await run_single_turn(ws, thread_id, pt, idx)
assistant_text = turn_result["reply"]
print(f"[TURN {idx}] 模型回复: {assistant_text}", flush=True)
print("✅ WebSocket 多轮数组测试通过。", flush=True)
if __name__ == "__main__":
asyncio.run(run_ws_smoke_test())

chrome CDP


更多推荐



所有评论(0)