socket测试

#!/Users/luca/miniforge3/envs/py311/bin/python
# -*- coding: utf-8 -*-
"""Codex app-server WebSocket 冒烟测试。"""

from __future__ import annotations

import asyncio
import json
from typing import Any, Dict

import websockets

# =========================
# 可直接修改的测试参数(不走命令行参数)
# =========================
WS_URL = "ws://127.0.0.1:8766"
TEST_CWD = "/Users/luca/tuya/test/codex_appserver"
CLIENT_NAME = "local-ws-tester"
CLIENT_VERSION = "1.0.0"
PT_ARRAY = [
    "你会哪些skill",
    "我银行卡余额300元,存款100,现在余额多少",
    "继续存款10000,现在余额多少"
]
TIMEOUT_SECONDS = 180


async def wait_for_message(ws, matcher, timeout: int) -> Dict[str, Any]:
    """循环读取消息,直到命中 matcher 条件或超时。"""
    async with asyncio.timeout(timeout):
        while True:
            raw = await ws.recv()
            data = json.loads(raw)
            if isinstance(data, dict) and matcher(data):
                return data


async def send_single_turn_request(
    ws, thread_id: str, pt: str, turn_index: int
) -> str:
    """发送单轮 turn/start 请求,并返回 turn_id。"""
    request_id = f"turn-{turn_index}"
    await ws.send(
        json.dumps(
            {
                "id": request_id,
                "method": "turn/start",
                "params": {
                    "threadId": thread_id,
                    "input": [{"type": "text", "text": pt}],
                },
            },
            ensure_ascii=False,
        )
    )
    turn_resp = await wait_for_message(
        ws,
        lambda m: m.get("id") == request_id and "result" in m,
        TIMEOUT_SECONDS,
    )
    return turn_resp["result"]["turn"]["id"]


async def parse_single_turn_result(ws, turn_id: str) -> str:
    """解析单轮 turn 结果:按 turn_id 收集增量直到 turn/completed。"""
    collected: list[str] = []
    async with asyncio.timeout(TIMEOUT_SECONDS):
        while True:
            raw = await ws.recv()
            msg = json.loads(raw)
            if not isinstance(msg, dict):
                continue
            method = msg.get("method")
            params = msg.get("params") or {}

            if method == "item/agentMessage/delta" and params.get("turnId") == turn_id:
                delta = params.get("delta", "")
                if delta:
                    collected.append(delta)

            if method == "turn/completed":
                completed_turn_id = (params.get("turn") or {}).get("id")
                if completed_turn_id == turn_id:
                    break

    return "".join(collected)


async def run_single_turn(ws, thread_id: str, pt: str, turn_index: int) -> dict[str, str]:
    """执行单轮请求:先发送请求,再解析结果,返回 turn_id 和回复文本。"""
    turn_id = await send_single_turn_request(ws, thread_id, pt, turn_index)
    print(f"turn/start 已接受, turn_id={turn_id}", flush=True)
    assistant_text = await parse_single_turn_result(ws, turn_id)
    return {"turn_id": turn_id, "reply": assistant_text}


async def run_ws_smoke_test() -> None:
    """执行基于 WebSocket 的 app-server 冒烟测试。"""
    if not PT_ARRAY:
        raise ValueError("PT_ARRAY 不能为空。")

    print("[WS 1/5] 连接 app-server...", flush=True)
    async with websockets.connect(WS_URL, max_size=10 * 1024 * 1024) as ws:
        print("[WS 2/5] 发送 initialize...", flush=True)
        await ws.send(
            json.dumps(
                {
                    "id": "init-1",
                    "method": "initialize",
                    "params": {
                        "clientInfo": {"name": CLIENT_NAME, "version": CLIENT_VERSION}
                    },
                },
                ensure_ascii=False,
            )
        )
        init_resp = await wait_for_message(
            ws,
            lambda m: m.get("id") == "init-1" and "result" in m,
            TIMEOUT_SECONDS,
        )
        print(f"initialize 响应: {init_resp.get('result')}", flush=True)

        print("[WS 3/5] 发送 initialized + thread/start...", flush=True)
        await ws.send(json.dumps({"method": "initialized"}, ensure_ascii=False))
        await ws.send(
            json.dumps(
                {
                    "id": "thread-1",
                    "method": "thread/start",
                    "params": {
                        "cwd": TEST_CWD,
                        "approvalPolicy": "never",
                        "sandbox": "workspace-write",
                    },
                },
                ensure_ascii=False,
            )
        )
        thread_resp = await wait_for_message(
            ws,
            lambda m: m.get("id") == "thread-1" and "result" in m,
            TIMEOUT_SECONDS,
        )
        thread_id = thread_resp["result"]["thread"]["id"]
        print(f"thread/start 成功, thread_id={thread_id}", flush=True)

        print("[WS 4/5] 循环处理 PT_ARRAY...", flush=True)
        for idx, pt in enumerate(PT_ARRAY, start=1):
            print(f"[TURN {idx}] 输入 PT: {pt}", flush=True)
            turn_result = await run_single_turn(ws, thread_id, pt, idx)
            assistant_text = turn_result["reply"]
            print(f"[TURN {idx}] 模型回复: {assistant_text}", flush=True)
        print("✅ WebSocket 多轮数组测试通过。", flush=True)


if __name__ == "__main__":
    asyncio.run(run_ws_smoke_test())

chrome CDP

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐