引言:别拿 PC 端的阔气挑战嵌入式的贫穷

看你的原版代码,你定义了一个包含各种大数组的 Node 结构体,然后还在 main 函数里用 malloc 去给这些节点分配内存。在 PC 端跑仿真,你有 16GB 的 DDR4 内存,你感觉不到痛。但如果让你把这套共识算法移植到只有 128KB SRAM 的 STM32 或者车规级 MCU 上,上电瞬间必定 HardFault(硬件错误),直接堆栈溢出死机。

面试官在这里绝对会埋伏你:“请问你这个结构体占多大内存?存在哪个段?为什么?”答不上来,直接出门左转。

深度拆解 1:结构体与内存对齐的致命陷阱

我们先来看原版代码里的核心数据结构实体关系。

核心 ER 实体关系图(逻辑架构)

[Message 消息实体] 
  |-- sender (int)
  |-- target (int)
  |-- type   (enum)
  |-- seq    (int)
  |-- view   (int)
       |
       v (流向/存储于)
[Node 节点实体] (1对多关系,包含200容量的msg_queue)
  |-- id, type, state
  |-- current_seq, next_seq
  |-- prepare_count[50], commit_count[50]
  |-- has_sent_prepare[2000], has_sent_commit[2000]

避坑指南:小白写法的内存黑洞

来看原版代码里最要命的这段定义:

// 小白写法(原版节选)
typedef struct {
    // ... 前面省略
    BOOL has_sent_prepare[2000]; // 记录每个序列号是否已发送Prepare
    BOOL has_sent_commit[2000];  // 记录每个序列号是否已发送Commit
    // ...
} Node;

你以为 BOOL 只占 1 个 bit 吗?在 Windows API 里,BOOL 本质上是 int(4 字节)。

来算一笔账:

  • has_sent_prepare[2000] = 2000 * 4 = 8000 字节 (约 7.8 KB)

  • has_sent_commit[2000] = 8000 字节 (约 7.8 KB)

  • Node 里的其他数组和 Windows 句柄加起来,一个 Node 实例差不多要吃掉将近 20KB 的内存!

如果你 MAX_NODES 设为 50,总内存占用逼近 1MB!对于单片机来说,这就叫“代码未动,内存先崩”。

 高薪写法:位带操作与状态压缩

真正的老码农,知道 CPU 的 cache line 是宝贵的,内存总线的带宽是有限的。对于“是/否”这种状态,我们必须用到位图(Bitmap)和位带操作的思想。我们要把布尔数组压缩成位!

看这段工业级重构代码:

#include <stdint.h>
#include <stdbool.h>

// 14k 高薪架构级写法
// #pragma pack(push, 4) // 强制4字节对齐,防止编译器乱填充浪费内存
typedef struct {
    uint16_t id;          // 节点ID,用16位足够了(0-65535)
    uint8_t  type;        // 枚举类型本质是数字,限制在8位内
    uint8_t  state;       // 对齐凑够4字节,完美!CPU总线单次读取直接带走
    
    uint32_t current_sequence;  
    uint32_t next_expected_seq; 
    
    // 【架构师绝杀】:用 uint8_t 数组代替 int 数组统计票数,最大节点数50,8位上限255足够!
    // 原来 50*4 = 200字节,现在只要 50 字节!
    uint8_t  prepare_count[50]; 
    uint8_t  commit_count[50];
    
    // 【底层位图压缩】:2000个状态,用比特位存储!
    // 2000 bits / 32 = 62.5,取 63 个 uint32_t 即可搞定。
    // 内存从 8000 Bytes 暴降到 63 * 4 = 252 Bytes!压缩率高达 96.8%!
    uint32_t has_sent_prepare[63]; 
    uint32_t has_sent_commit[63];  
    
    // 省略 Windows 特有的 CS 和 HANDLE 锁定义...
} Node;
// #pragma pack(pop)

/* * 工业级注释:如何读写位图?必须用位运算!
 * 为什么这么写?
 * 硬件在这里发生了什么?CPU只支持按字节或字读取内存。
 * 我们通过 (seq / 32) 计算出这个请求在哪个 32位字 里,
 * 再通过 (seq % 32) 计算出在字里的第几个比特。
 * 最后用按位或(|)置1,这在底层最终会被编译成一条单独的 BIC/ORR 汇编指令!
 */
void set_prepare_flag(Node* node, int seq) {
    if (seq >= 2000) return; // 防御性编程,防指针越界飞线
    uint32_t index = seq / 32;
    uint32_t bit_pos = seq % 32;
    node->has_sent_prepare[index] |= (1 << bit_pos); // 寄存器级位操作
}

bool check_prepare_flag(Node* node, int seq) {
    if (seq >= 2000) return false;
    uint32_t index = seq / 32;
    uint32_t bit_pos = seq % 32;
    return (node->has_sent_prepare[index] & (1 << bit_pos)) != 0;
}

深度对比:小白 vs 14k 内存账本表

考察维度 原版小白写法 (BOOL[2000]) 14k 架构写法 (uint32_t[63]) 硬件底层意义
单项内存占用 8000 字节 252 字节 避免 SRAM 溢出,提高 L1 Cache 命中率
总线传输开销 极高(需频繁换页) 极低(连续小段内存) ARM AHB/APB 总线传输无延迟
赋值操作指令 array[i] = 1 (写32位) ` = (1 << n)` (读-改-写位)

深度拆解 2:堆区的死神(Malloc 的原罪)

看你 main 函数里的这句:nodes = (Node *)malloc(N_NODES * sizeof(Node));

在 C 语言的课本里,这么写没问题。但在真正长年不死机、要求高可靠性的底层系统里,严禁使用动态内存分配(malloc)

面试官会问你:“你的系统跑了 3 个月后死机了,为什么?”老码农告诉你:因为内存碎片(Memory Fragmentation)。你不断地 malloc 和 free,堆区(Heap)会被切得七零八落。到了最后,虽然系统还有剩余总内存,但就是找不到一块连续的、足够大的内存给你,malloc 返回 NULL,你没做判断,紧接着指针偏移写数据——“啪”,段错误(SegFault),系统崩溃重启。

14k 高薪写法:静态内存池(Memory Pool)与 BSS 段

必须采用预分配机制,把内存锁死在编译期!

// 14k 高薪写法:静态分配
#define MAX_SUPPORTED_NODES 50

// 把节点数组直接扔进 BSS 段(未初始化全局数据区)
// 硬件层面:程序烧录进 Flash,启动时 C 库的 startup.s 汇编代码会自动将这块 RAM 清零。
// 绝不产生任何内存碎片,永远不会分配失败!
static Node g_nodes_pool[MAX_SUPPORTED_NODES]; 
static int g_active_nodes_count = 0;

void initialize_nodes_robust(int requested_nodes) {
    // 工业级防御:永远不要相信外部传入的参数
    if (requested_nodes > MAX_SUPPORTED_NODES) {
        // 在裸机里这里可能是个 while(1) 闪烁红灯报警,或者串口打印
        printf("FATAL: 节点数超限!\n");
        exit(-1); 
    }
    
    g_active_nodes_count = requested_nodes;
    
    // 初始化逻辑(无需 malloc,直接用预分配的池子)
    for (int i = 0; i < g_active_nodes_count; i++) {
        g_nodes_pool[i].id = i;
        g_nodes_pool[i].state = IDLE;
        // ... 省略其余初始化
    }
}

看这个逻辑判定,体会下从堆到静态池的思想转变:

Plaintext

内存分配抉择树
 |-- 你的代码用 malloc()
 |    |-- 成功: 拿到了 Heap 里的连续内存
 |    |-- 失败: 产生碎片 -> 返回 NULL -> 指针悬空 -> 野指针越界 -> 宕机
 |
 |-- 14k 架构师用 static Array
      |-- 编译期决断: RAM 够不够?不够直接编译报错 (Region RAM overflowed)
      |-- 运行期保障: 地址固定不变,通过 MPU(内存保护单元) 可直接锁定此区域,安全无敌!

阶段总结提炼 (1/3)

这节课主要是把你脑子里“力扣式”的软件思维,强行拉回到硅片上的物理硬件思维。来,看这个思维导图总结:

  • 嵌入式 C 底层思维觉醒

    • 结构体不仅是打包数据的,更是用来贴合 CPU 总线的

      • 禁用无意义的大数组,用 uint8_t / uint16_t 缩减尺寸

      • 使用 Bitmaps (位图) 极致压缩二元状态

    • 内存分配不是向操作系统要,而是对物理 RAM 的精准规划

      • 抛弃 Heap (堆区) 和 malloc,拥抱 BSS/Data (静态/全局区)

      • 编译期防患于未然,好过运行时的追悔莫及


深度拆解 3:扯下操作系统的遮羞布——中断与临界区

面试官会在这埋伏你:“你代码里的 EnterCriticalSection,在单片机底层到底发生了什么?”

小白会回答:“为了防止别的线程进来抢数据。”

废话!我要听的是硬件原理!

1. 虚假的事件 vs 真实的硬件中断

在你的原版仿真里,NodeThread 是一个死循环,平时休眠,等 msg_queue_event 被触发。这在底层实际上对应的是中断服务例程(ISR, Interrupt Service Routine)

在真实的分布式节点(比如通过以太网或 CAN 总线收发消息)中,当一帧 PBFT 的消息到达网卡芯片时,网卡会触发 CPU 内部的 NVIC(嵌套向量中断控制器),强制 CPU 暂停当前程序,跳转到中断处理函数。

2. 暴力的锁:裸机环境下的临界区

在嵌入式底层,保护全局资源(比如你的消息计数器)的最高效手段是全局关中断

逻辑判定树:

  1. 是否有多个实体会同时修改 msg_queue_size?

    • :不需要锁,直接跑,追求极致吞吐量。

    • :谁在抢?

      • 两个普通 Task 在抢:使用 RTOS 的 Mutex (互斥量)。

      • Task 和 硬件中断(ISR) 在抢:致命危险!必须关中断 (Disable Interrupts)!

14k 架构师必懂的底层实现(ARM 汇编级):

/* * 为什么这里要加 volatile 和 memory 屏障?
 * 因为编译器可能会自作聪明地重新排序你的指令。
 * 加上 memory 屏障,强制让 CPU 把之前没写完的数据全部写进 RAM,防止数据不一致。
 */
static inline void Enter_Critical_Hardware(void) {
    __asm volatile ("CPSID i" : : : "memory"); // 指令:关闭全局中断
}

static inline void Exit_Critical_Hardware(void) {
    __asm volatile ("CPSIE i" : : : "memory"); // 指令:开启全局中断
}

深度拆解 4:O(N) 搬运工的末日——环形缓冲区(Ring Buffer)重构

我们要扒的第二个毒瘤,是你代码里处理消息队列的这一段:

// 原版小白代码:死亡搬运工
Message msg = node->msg_queue[0];
for (int i = 0; i < node->msg_queue_size - 1; i++) {
    node->msg_queue[i] = node->msg_queue[i + 1]; // 致命的 O(N) 内存搬运
}

这是我最看不惯的写法!你的队列有 200 个容量,当你取队头消息时,要把后面 199 个消息全部往前挪一步。这叫吃饱了撑的浪费 CPU 周期!在中断里做这种搬运,系统响应会飙升,导致丢包。

高薪写法:无锁化 / 低延迟环形缓冲区

#include <stdint.h>
#include <stdbool.h>

// 消息结构体:使用紧凑对齐,防止编译器插入垃圾字节
#pragma pack(push, 1)
typedef struct {
    uint16_t sender;
    uint16_t target;
    uint8_t  type;    
    uint32_t sequence; 
    uint32_t view; 
} Message_t;
#pragma pack(pop)

// 【架构师绝杀】:容量必须是 2 的 N 次方!
// 用 & (SIZE-1) 替代取余 (%),单片机算位运算只要 1 个周期,算取余可能要 50 个周期!
#define QUEUE_SIZE 256 
#define QUEUE_MASK (QUEUE_SIZE - 1)

typedef struct {
    Message_t buffer[QUEUE_SIZE];
    // volatile 关键字:防止编译器优化,强制从内存读取,这是嵌入式并发的命门
    volatile uint32_t head; // 读指针
    volatile uint32_t tail; // 写指针
} RingBuffer_t;

// 14k 薪资写法:压入消息(通常在接收中断 ISR 中调用)
bool RingBuffer_Push(RingBuffer_t* rb, Message_t* msg) {
    uint32_t next_tail = (rb->tail + 1) & QUEUE_MASK;
    
    // 检查是否溢出:如果下一个写位置等于读位置,说明塞满了
    if (next_tail == rb->head) {
        // 工业级处理:记录丢包次数,甚至触发自愈逻辑
        return false; 
    }
    
    // 硬件发生了什么:CPU直接寻址写入,一行代码解决,效率提升 200%
    rb->buffer[rb->tail] = *msg; 
    rb->tail = next_tail;
    return true;
}

// 弹出消息(在主任务循环中调用)
bool RingBuffer_Pop(RingBuffer_t* rb, Message_t* msg) {
    if (rb->head == rb->tail) return false; // 队列空,秒回,不阻塞主线程
    
    *msg = rb->buffer[rb->head];
    rb->head = (rb->head + 1) & QUEUE_MASK;
    return true;
}

深度拆解 5:PBFT 有限状态机(FSM)的工业级重构

你原代码里的 switch-case 放在多线程里跑,逻辑是散的。在分布式共识系统里,每个节点都是一个复杂的有限状态机。如果状态流转不严谨,节点就会陷入“活锁”。

不同实现方式的对比分析表

维度 小白写法(If-Else 嵌套) 14k 架构师(有限状态机 FSM) 硬件底层影响
代码路径 路径不确定,执行时间剧烈抖动 路径固定,确定性极强 影响实时系统的 Worst-Case 响应时间
调试难度 很难复现特定逻辑分支的 Bug 通过状态跳转表一眼定位问题 方便使用逻辑分析仪抓取状态引脚
内存开销 大量局部变量存在栈里,容易溢出 状态数据集中在静态区,栈开销极小 防止堆栈穿透导致硬错误 (HardFault)

PBFT 核心逻辑流(二叉树分叉展示)

1- 消息包到达 (MSG_RECEIVED)
   |-- 2.1 校验失败 (Invalid Sig/Seq) -> 直接 DROP,释放缓冲区
   |-- 2.2 校验成功 (Valid)
       |-- 3.1 状态 = IDLE 
           |--> 执行 handle_pre_prepare() -> 广播 PREPARE -> 状态切至 PRE_PREPARED
       |-- 3.2 状态 = PRE_PREPARED
           |--> 统计 Prepare 票数 -> 满 QUORUM -> 广播 COMMIT -> 状态切至 PREPARED
       |-- 3.3 状态 = PREPARED
           |--> 统计 Commit 票数 -> 满 QUORUM -> 执行共识请求 -> 回到 IDLE

本章总结提炼:超越 14k 的核心点

  • 避坑指南: 绝对不要在关中断(Critical Section)里写 printffor 循环。printf 内部涉及串口波特率等待,会阻塞 CPU,导致其他高优先级中断丢失。

  • 面试杀手锏: 如果面试官问你如何优化吞吐量,告诉他:“我会利用 Cache Line 对齐 RingBuffer 的读写指针,防止伪共享(False Sharing)。” 这一句话就能让他觉得你起码值 18k。

  • 思维导图总结:

    • 并发控制:中断级关中断 > RTOS 信号量 > Windows 锁。

    • 数据交换:RingBuffer 是唯一王道,O(1) 复杂度干掉 O(N)。

    • 逻辑建模:严禁面条代码,必须通过 FSM 状态机进行严格流转。


不仅重写了你的消息队列,还带你下到了汇编和总线层面。你要明白,你能保证这套 C 语言在资源极度匮乏、环境极度恶劣的板子上跑得稳、跑得快。

深度拆解 6:主节点反水了怎么办?——视图切换的逻辑核爆

面试官最喜欢问:“PBFT 怎么保证活性(Liveness)?”

如果你只回答“超时切换”,那你可以回去等通知了。我要听的是:“如何通过状态机的一致性转换,在不停止服务的情况下完成权力的交接。”

1. 视图切换的实体关系图(ER 图)

在底层实现中,视图(View)不是一个简单的数字,它是一套状态映射关系

[View 视图实体]
  |-- ViewID (uint32_t): 当前权力代号
  |-- PrimaryID (ViewID % N): 谁是老大(由代号决定,公平轮询)
  |-- Timer (Hardware_Timer): 活性的守护神
       |
       v (触发)
[ViewChange 消息包]
  |-- NewViewID
  |-- Last_Stable_Checkpoint
  |-- Set_of_Prepared_Messages (证明自己没撒谎)

2. 避坑指南:小白最容易写的“递归死循环”

很多刚入行的程序员,喜欢在超时处理函数里直接调用 start_view_change(),而 start_view_change 里又去重置定时器。如果网络抖动,一堆节点就会陷入疯狂切视图的死锁。

14k 架构师写法:阶梯式超时退避策略。

就像你平时追女生,发个信息没回,你不能每秒钟弹窗一次,你得等 1 分钟、5 分钟、20 分钟……这在底层叫 Exponential Backoff(指数退避)


深度拆解 7:别再用 Sleep 了!——硬件定时器与时间轮架构

看你原版代码里的 Sleep(10)。在嵌入式驱动开发里,用 Sleep 是要被开除的!Sleep 会让 CPU 空转或强行挂起任务,导致其他紧急任务(比如过温保护)无法响应。

我们需要的是底层硬件定时器(Hardware Timer)

1. 定时器在硅片上的真相

CPU 内部有一个计数器寄存器(CNT),它每隔几个纳秒自增一次。当它加到我们设定的阈值时,会产生一个硬件中断。

2. 高薪写法:高性能时间轮(Time Wheel)驱动

如果你有 50 个节点,每个节点都有 3 个超时闹钟,难道你要开 150 个硬件定时器?MCU 没那么多资源。架构师会用时间轮算法,用一个硬件定时器模拟出成千上万个软件闹钟。

#include <stdint.h>

// 14k 架构师级:闹钟结构体
typedef struct {
    uint32_t timeout_ms;    // 目标到期时间
    void (*callback)(void); // 到期后的回调函数(比如触发视图切换)
    bool is_active;         // 是否启用
} SoftTimer_t;

// 全局时间戳(在每 1ms 一次系统滴答中断中自增)
static volatile uint32_t g_system_ticks = 0; 

// 【底层灵魂】:系统滴答中断服务程序 (SysTick ISR)
// 硬件每 1ms 自动跳进来一次
void SysTick_Handler(void) {
    g_system_ticks++;
}

// 工业级非阻塞超时检查
void PBFT_Timer_Task(SoftTimer_t* timers, int count) {
    for (int i = 0; i < count; i++) {
        if (timers[i].is_active) {
            // 使用“差值法”防止 uint32_t 溢出导致的逻辑错误
            // 这是深穗莞老码农的保命手段:(当前时间 - 开始时间 > 间隔)
            if ((uint32_t)(g_system_ticks - timers[i].timeout_ms) < 0x7FFFFFFF) {
                timers[i].is_active = false;
                timers[i].callback(); // 杀伐果断,到点执行
            }
        }
    }
}

深度拆解 8:从“空中阁楼”到“工业级共识”的代码重构

现在,我把视图切换的核心代码按工业级标准重写给你。注意看里面的防御性编程寄存器思想

C

// 视图切换状态定义
typedef enum {
    VIEW_STABLE,      // 视图稳定,正常共识
    VIEW_CHANGING,    // 正在切换,拒绝正常请求
    VIEW_RECOVERING   // 恢复中
} ViewState_t;

static ViewState_t g_current_v_state = VIEW_STABLE;
static uint32_t g_view_number = 0;

/*
 * 【14k 高薪写法】:视图切换触发器
 * 为什么要传 reason?为了做系统诊断(Diagnostics)。
 * 在车载底层,每一个视图切换都要记录进黑匣子(Flash 存储)。
 */
void Trigger_View_Change(uint8_t reason) {
    if (g_current_v_state == VIEW_CHANGING) return; // 防止重入攻击
    
    Enter_Critical_Hardware(); // 关中断,保证切换过程原子性
    
    g_current_v_state = VIEW_CHANGING;
    g_view_number++; // 开启新纪元
    
    // 硬件动作:清空之前的环形缓冲区,因为旧视图的消息已经失效了
    RingBuffer_Clear(&g_msg_queue); 
    
    // 广播 VIEW-CHANGE 消息
    // 注意:这里用的是我们第二章写的无锁发送逻辑
    PBFT_Broadcast_ViewChange(g_view_number, reason);
    
    Exit_Critical_Hardware();
    
    // 打印诊断信息(仅限调试模式)
    // DEBUG_LOG("ViewChange Triggered! New View: %d, Reason: %d", g_view_number, reason);
}

终极对比:小白仿真 vs 14k 架构级系统

特性 小白仿真写法(你之前的代码) 14k 架构师写法(重构后) 核心价值
超时检测 Sleep(100) 后检查变量 SysTick 硬件中断 + 时间轮 毫秒级精度,0 延迟响应
异常处理 只有正常的共识流 完备的 View Change 状态机 保证系统在极端环境下不死机
消息过滤 来者不拒,容易被洪水攻击塞满 视图校验 + 序列号滑动窗口 极强的抗干扰和抗攻击能力
代码移植 绑定 Windows.h,只能跑在电脑上 纯 C 语言 + 硬件抽象层 (HAL) 一套代码通杀 STM32、NXP、TI 平台

本章思维导图:PBFT 灵魂总结

  • 视图切换 (View Change)

    • 触发机制:请求超时、主节点签名非法、检测到拜占庭行为。

    • 安全性:必须收集到 $2f+1$ 个 View-Change 证明,新视图才合法。

  • 硬件定时器 (Timer)

    • 原则:严禁阻塞,拥抱中断。

    • 技巧:用 uint32_t 溢出回绕处理时间差。

  • 系统活性 (Liveness)

    • 指数退避,防止网络风暴。

    • 状态机自愈,从崩溃边缘强行拉回。


一句话: 路在脚下

内存对齐的比特位,到环形队列的指针偏移,再到硬件中断的临界区保护。0ffer,靠的不是背几道题,而是当你站在面试官面前,你能指着屏幕上的代码告诉他:“我这行代码,考虑到了 CPU 的 Cache Line,考虑到了 SRAM 的碎片化,还考虑到了中断嵌套的原子性。

附录,千行源码:

#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>

// ==================== 配置参数 ====================
// 可根据需要调整这些参数来测试不同规模的PBFT网络
#define MIN_NODES 10          // 最小节点数 - 确保网络规模不会太小
#define MAX_NODES 50          // 最大节点数 - 防止网络规模过大导致性能问题
#define REQUESTS_COUNT 1500   // 共识请求数量 (1000-2000范围内) - 测试多请求处理能力
#define BYZANTINE_RATIO 0.2   // 拜占庭节点比例 (20%) - 模拟网络中恶意节点的比例
#define MIN_LATENCY 5         // 最小网络延迟 (ms) - 模拟真实网络的基础延迟
#define MAX_LATENCY 100       // 最大网络延迟 (ms) - 模拟网络延迟的上限
#define PACKET_LOSS_RATE 0.05 // 网络丢包率 (5%) - 模拟网络不稳定情况
#define EXTRA_DELAY_RATE 0.1  // 额外延迟概率 (10%) - 模拟网络抖动

// ==================== 数据结构定义 ====================

// 消息类型枚举 - PBFT协议的三阶段消息
typedef enum
{
    PRE_PREPARE, // 预准备阶段 - 由主节点发起
    PREPARE,     // 准备阶段 - 由副本节点广播
    COMMIT       // 提交阶段 - 达成共识后执行
} MsgType;

// 节点类型枚举 - 区分诚实节点和拜占庭节点
typedef enum
{
    HONEST,   // 诚实节点 - 遵循协议规则
    BYZANTINE // 拜占庭节点 - 可能表现出恶意行为
} NodeType;

// 节点状态枚举 - 跟踪节点在PBFT协议中的当前状态
typedef enum
{
    IDLE,         // 空闲状态 - 未参与任何共识
    PRE_PREPARED, // 已收到预准备消息
    PREPARED,     // 已收集到足够准备消息(达到法定人数)
    COMMITTED     // 已收集到足够提交消息,共识达成
} NodeState;

// 消息结构体 - 封装PBFT协议消息的所有必要信息
typedef struct
{
    int sender;   // 发送者ID
    int target;   // 目标节点ID(广播时可能为-1)
    MsgType type; // 消息类型(PRE_PREPARE/PREPARE/COMMIT)
    int sequence; // 请求序列号 - 用于区分不同的客户端请求
    int view;     // 视图号(用于视图切换),一直在往后运行
} Message;

// 节点结构体 - 每个节点的完整状态信息
typedef struct
{
    int id;                      // 节点唯一标识符
    NodeType type;               // 节点类型(诚实/拜占庭)
    NodeState state;             // 当前协议状态
    int current_sequence;        // 当前正在处理的请求序列号
    int next_expected_seq;       // 下一个期望接收的请求序列号(用于防止重放攻击)
    int prepare_count[50];       // 记录每个节点是否发送了Prepare消息(支持最多50个节点)
    int commit_count[50];        // 记录每个节点是否发送了Commit消息
    BOOL has_sent_prepare[2000]; // 记录每个序列号是否已发送Prepare(防止重复发送)
    BOOL has_sent_commit[2000];  // 记录每个序列号是否已发送Commit(防止重复发送)
    CRITICAL_SECTION msg_cs;     // 消息队列的临界区 - 保护队列的线程安全访问
    HANDLE msg_queue_event;      // 消息队列事件 - 用于通知线程有新消息到达
    Message msg_queue[200];      // 消息队列缓冲区 - 存储待处理的消息(增加队列大小以处理多请求)
    int msg_queue_size;          // 当前队列中的消息数量
    int completed_requests;      // 已成功完成的请求数量
} Node;

// ==================== 全局变量 ====================
// 这些变量在整个程序中被所有线程共享
int N_NODES;                       // 实际创建的节点总数(随机在MIN_NODES到MAX_NODES之间)
int F_TOLERANCE;                   // 系统容错能力 - 最多能容忍的拜占庭节点数(f = (n-1)/3)
int QUORUM;                        // 法定人数 - 达成共识所需的最小节点数(2f+1)
int BYZANTINE_NODES;               // 实际的拜占庭节点数量
Node *nodes;                       // 所有节点的数组指针
CRITICAL_SECTION global_cs;        // 全局临界区 - 保护全局统计变量的线程安全
HANDLE consensus_complete_event;   // 全局共识完成事件 - 通知所有线程停止运行
int total_messages_sent = 0;       // 全局统计:总共发送的消息数量
int total_consensus_success = 0;   // 全局统计:成功达成共识的请求数量
int total_consensus_failed = 0;    // 全局统计:失败的共识请求数量
double total_consensus_time = 0.0; // 全局统计:总共识时间(秒)

// ==================== 网络模拟函数 ====================

// 网络延迟模拟 - 更真实的网络环境
// 模拟真实网络中的延迟、抖动和丢包现象
void simulate_network_delay()
{
    // 基础延迟:在MIN_LATENCY到MAX_LATENCY之间随机选择
    int delay = MIN_LATENCY + rand() % (MAX_LATENCY - MIN_LATENCY);

    // 网络抖动:10%的概率增加额外的0-200ms随机延迟
    if ((rand() % 100) < 10)
    {
        delay += rand() % 200; // 额外0-200ms抖动
    }

    // 网络丢包模拟:PACKET_LOSS_RATE概率模拟严重丢包(5秒超时)
    // 注意:这里不是真正丢弃消息,而是通过大幅增加延迟来模拟丢包效果
    if ((rand() % 100) < (int)(PACKET_LOSS_RATE * 100))
    {
        delay += 5000; // 模拟严重丢包(5秒超时)
    }

    Sleep(delay); // 实际执行延迟
}

// ==================== 消息传递函数 ====================

// 发送消息到指定节点
// 这是生产者-消费者模型中的"生产者"部分
void send_message_to_node(int sender, int target, MsgType type, int sequence, int view)
{
    // 参数验证:确保目标节点和发送者节点ID有效
    if (target < 0 || target >= N_NODES)
        return;
    if (sender < 0 || sender >= N_NODES)
        return;

    // 拜占庭节点恶意行为模拟
    // 如果发送者是拜占庭节点,则有一定概率表现出恶意行为
    if (nodes[sender].type == BYZANTINE)
    {
        int malicious_action = rand() % 100;
        if (malicious_action < 30)
        {
            // 30%概率丢弃消息 - 完全不发送
            return;
        }
        else if (malicious_action < 45)
        {
            // 15%概率篡改序列号 - 发送错误的序列号
            sequence = rand() % REQUESTS_COUNT + 1;
        }
        else if (malicious_action < 60)
        {
            // 15%概率篡改视图号 - 发送错误的视图号
            view = rand() % 10;
        }
        // 其他情况正常发送(但可能有延迟)- 40%概率正常行为
    }

    // 模拟网络延迟 - 在实际发送前等待随机时间
    simulate_network_delay();

    // 关键:将消息放入目标节点的消息队列(生产者操作)
    // 使用目标节点的临界区保护队列访问的线程安全
    EnterCriticalSection(&nodes[target].msg_cs);
    if (nodes[target].msg_queue_size < 200) // 队列未满
    {
        // 将消息复制到目标节点的队列末尾
        nodes[target].msg_queue[nodes[target].msg_queue_size].sender = sender;
        nodes[target].msg_queue[nodes[target].msg_queue_size].target = target;
        nodes[target].msg_queue[nodes[target].msg_queue_size].type = type;
        nodes[target].msg_queue[nodes[target].msg_queue_size].sequence = sequence;
        nodes[target].msg_queue[nodes[target].msg_queue_size].view = view;
        nodes[target].msg_queue_size++; // 队列大小增加

        // 触发目标节点的消息队列事件 - 唤醒目标节点的线程
        SetEvent(nodes[target].msg_queue_event);

        // 更新全局消息统计(需要全局临界区保护)
        EnterCriticalSection(&global_cs);
        total_messages_sent++;
        LeaveCriticalSection(&global_cs);
    }
    // 如果队列已满,消息会被丢弃(生产者-消费者缓冲区溢出)
    LeaveCriticalSection(&nodes[target].msg_cs); // 释放目标节点的临界区
}

// 广播消息
// 向除自己外的所有节点发送消息
void broadcast_message(int sender, MsgType type, int sequence, int view)
{
    for (int i = 0; i < N_NODES; i++)
    {
        if (i != sender) // 不向自己发送消息
        {
            send_message_to_node(sender, i, type, sequence, view);
        }
    }
}

// ==================== 消息处理函数 ====================

// 处理Pre-prepare消息
// 当副本节点收到主节点的PRE-PREPARE消息时调用
void handle_pre_prepare(Node *node, Message *msg)
{
    // 消息验证:检查序列号是否有效
    // 如果序列号小于等于next_expected_seq-1,说明是重复或过期的消息
    if (msg->sequence <= node->next_expected_seq - 1)
    {
        // 重复或过期的消息,忽略 - 防止重放攻击和状态混乱
        return;
    }

    // 拜占庭节点行为:40%概率不处理消息
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 40)
        {
            return; // 40%概率不处理
        }
    }

    // 处理有效的PRE-PREPARE消息
    if (msg->sequence >= node->next_expected_seq)
    {
        // 更新节点状态
        node->current_sequence = msg->sequence; // 记录当前处理的序列号
        node->state = PRE_PREPARED;             // 进入PRE-PREPARED状态

        // 防止重复发送PREPARE消息
        if (!node->has_sent_prepare[msg->sequence])
        {
            node->has_sent_prepare[msg->sequence] = TRUE;
            // 广播PREPARE消息给其他所有节点
            broadcast_message(node->id, PREPARE, msg->sequence, msg->view);
        }
    }
}

// 处理Prepare消息
// 当节点收到其他节点的PREPARE消息时调用
void handle_prepare(Node *node, Message *msg)
{
    // 序列号验证:确保消息属于当前正在处理的请求
    if (msg->sequence != node->current_sequence)
    {
        return; // 序列号不匹配,忽略 - 防止不同请求的消息混淆
    }

    // 拜占庭节点行为:25%概率不处理消息
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 25)
        {
            return; // 25%概率不处理
        }
    }

    // 处理有效的PREPARE消息
    if (msg->sequence == node->current_sequence && node->state == PRE_PREPARED)
    {
        // 记录发送者已发送PREPARE消息
        node->prepare_count[msg->sender] = 1;

        // 统计收到的PREPARE消息总数
        int prepare_total = 0;
        for (int i = 0; i < N_NODES; i++)
        {
            prepare_total += node->prepare_count[i];
        }

        // 检查是否达到法定人数(2f+1)
        if (prepare_total >= QUORUM)
        {
            // 达到法定人数,进入PREPARED状态
            node->state = PREPARED;

            // 防止重复发送COMMIT消息
            if (!node->has_sent_commit[msg->sequence])
            {
                node->has_sent_commit[msg->sequence] = TRUE;
                // 广播COMMIT消息给其他所有节点
                broadcast_message(node->id, COMMIT, msg->sequence, msg->view);
            }
        }
    }
}

// 处理Commit消息
// 当节点收到其他节点的COMMIT消息时调用
void handle_commit(Node *node, Message *msg)
{
    // 序列号验证:确保消息属于当前正在处理的请求
    if (msg->sequence != node->current_sequence)
    {
        return; // 序列号不匹配,忽略
    }

    // 拜占庭节点行为:15%概率不处理消息
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 15)
        {
            return; // 15%概率不处理
        }
    }

    // 处理有效的COMMIT消息
    if (msg->sequence == node->current_sequence && node->state == PREPARED)
    {
        // 记录发送者已发送COMMIT消息
        node->commit_count[msg->sender] = 1;

        // 统计收到的COMMIT消息总数
        int commit_total = 0;
        for (int i = 0; i < N_NODES; i++)
        {
            commit_total += node->commit_count[i];
        }

        // 检查是否达到法定人数(2f+1)
        if (commit_total >= QUORUM)
        {
            // 共识达成!
            node->state = COMMITTED;
            node->completed_requests++;                  // 增加已完成请求数
            node->next_expected_seq = msg->sequence + 1; // 更新期望的下一个序列号

            // 重置计数器为下一个请求准备
            memset(node->prepare_count, 0, sizeof(node->prepare_count));
            memset(node->commit_count, 0, sizeof(node->commit_count));
            node->current_sequence = 0;
            node->state = IDLE;

            // 更新全局成功共识统计
            EnterCriticalSection(&global_cs);
            total_consensus_success++;
            LeaveCriticalSection(&global_cs);
        }
    }
}

// ==================== 节点主线程函数 ====================

// 节点主线程
// 每个节点运行一个独立的线程,实现并发执行
DWORD WINAPI NodeThread(LPVOID lpParam)
{
    Node *node = (Node *)lpParam; // 获取节点指针

    // 主节点负责发起请求
    // 只有ID为0且类型为诚实的节点才是主节点
    if (node->id == 0 && node->type == HONEST)
    {
        Sleep(100); // 等待其他节点启动 - 确保所有副本节点都已准备好

        // 初始化高精度性能计数器用于测量共识时间
        LARGE_INTEGER frequency, start, end;
        QueryPerformanceFrequency(&frequency);

        // 循环处理多个连续的客户端请求
        for (int seq = 1; seq <= REQUESTS_COUNT; seq++)
        {
            QueryPerformanceCounter(&start); // 记录开始时间

            // 发起新的请求 - 广播PRE-PREPARE消息
            broadcast_message(0, PRE_PREPARE, seq, 0);

            // 等待当前请求完成或超时(最多100ms)
            // 这里使用轮询方式检查completed_requests是否达到当前序列号
            int wait_time = 0;
            while (node->completed_requests < seq && wait_time < 100)
            {
                Sleep(10);
                wait_time += 10;
            }

            // 计算本次共识耗时
            QueryPerformanceCounter(&end);
            double elapsed = (double)(end.QuadPart - start.QuadPart) / frequency.QuadPart;

            // 更新全局总共识时间统计
            EnterCriticalSection(&global_cs);
            total_consensus_time += elapsed;
            LeaveCriticalSection(&global_cs);

            // 小延迟避免消息风暴 - 防止连续请求造成网络拥塞
            Sleep(1);
        }

        // 所有请求处理完成后,通知所有节点停止运行
        SetEvent(consensus_complete_event);
    }

    // 消息处理循环 - 所有节点(包括主节点)都执行此循环
    // 采用双重等待模式:外层检查全局停止事件,内层等待本地消息
    while (WaitForSingleObject(consensus_complete_event, 10) == WAIT_TIMEOUT)
    {
        // 内层等待:等待本地消息队列事件(最多10ms)
        // 无论是否有消息到达,都会继续执行后续代码
        WaitForSingleObject(node->msg_queue_event, 10);

        // 进入临界区保护消息队列访问
        EnterCriticalSection(&node->msg_cs);
        if (node->msg_queue_size > 0)
        {
            // 从队列头部取出消息(FIFO)
            Message msg = node->msg_queue[0];
            // 队列前移 - 将后面的消息逐个向前移动
            for (int i = 0; i < node->msg_queue_size - 1; i++)
            {
                node->msg_queue[i] = node->msg_queue[i + 1];
            }
            node->msg_queue_size--;              // 队列大小减1
            ResetEvent(node->msg_queue_event);   // 重置事件,为下一次消息到达做准备
            LeaveCriticalSection(&node->msg_cs); // 释放临界区(处理消息不需要临界区)

            // 根据消息类型分发处理
            switch (msg.type)
            {
            case PRE_PREPARE:
                handle_pre_prepare(node, &msg);
                break;
            case PREPARE:
                handle_prepare(node, &msg);
                break;
            case COMMIT:
                handle_commit(node, &msg);
                break;
            default:
                break;
            }
        }
        else
        {
            // 队列为空,直接释放临界区
            LeaveCriticalSection(&node->msg_cs);
        }
    }

    return 0;
}

// ==================== 初始化和清理函数 ====================

// 初始化节点
// 分配内存并初始化所有节点的状态
void initialize_nodes()
{
    nodes = (Node *)malloc(N_NODES * sizeof(Node));

    for (int i = 0; i < N_NODES; i++)
    {
        nodes[i].id = i;
        // 确保主节点(ID=0)是诚实的,其他节点按比例分配拜占庭/诚实类型
        nodes[i].type = (i == 0) ? HONEST : (i <= BYZANTINE_NODES) ? BYZANTINE
                                                                   : HONEST;
        nodes[i].state = IDLE;
        nodes[i].current_sequence = 0;
        nodes[i].next_expected_seq = 1;
        nodes[i].completed_requests = 0;
        nodes[i].msg_queue_size = 0;
        InitializeCriticalSection(&nodes[i].msg_cs); // 初始化每个节点的临界区
        // 创建自动重置事件(第二个参数FALSE)用于消息队列通知
        nodes[i].msg_queue_event = CreateEvent(NULL, FALSE, FALSE, NULL);
        // 初始化所有计数数组为0
        memset(nodes[i].prepare_count, 0, sizeof(nodes[i].prepare_count));
        memset(nodes[i].commit_count, 0, sizeof(nodes[i].commit_count));
        memset(nodes[i].has_sent_prepare, 0, sizeof(nodes[i].has_sent_prepare));
        memset(nodes[i].has_sent_commit, 0, sizeof(nodes[i].has_sent_commit));
    }
}

// 清理资源
// 释放所有分配的资源,防止内存泄漏
void cleanup_nodes()
{
    for (int i = 0; i < N_NODES; i++)
    {
        DeleteCriticalSection(&nodes[i].msg_cs); // 删除临界区
        CloseHandle(nodes[i].msg_queue_event);   // 关闭事件句柄
    }
    free(nodes); // 释放节点数组内存
}

// ==================== 主函数 ====================

// 主函数 - 程序入口点
int main()
{
    // 初始化随机数种子 - 确保每次运行结果不同
    srand((unsigned int)time(NULL));
    // 初始化全局临界区
    InitializeCriticalSection(&global_cs);
    // 创建手动重置事件(第二个参数TRUE)用于全局停止信号
    consensus_complete_event = CreateEvent(NULL, TRUE, FALSE, NULL);

    // 动态配置网络参数
    // 随机选择节点数量(10-50之间)
    N_NODES = MIN_NODES + rand() % (MAX_NODES - MIN_NODES + 1);
    // 确保满足3f+1约束(PBFT理论要求)
    F_TOLERANCE = (N_NODES - 1) / 3;
    if (F_TOLERANCE < 1)
        F_TOLERANCE = 1;                                // 至少容忍1个故障节点
    QUORUM = 2 * F_TOLERANCE + 1;                       // 法定人数 = 2f+1
    BYZANTINE_NODES = (int)(N_NODES * BYZANTINE_RATIO); // 计算拜占庭节点数量
    if (BYZANTINE_NODES >= N_NODES)
        BYZANTINE_NODES = N_NODES - 1; // 确保至少有一个诚实节点

    // 打印仿真配置信息
    printf("=== 高级PBFT多请求仿真 ===\n");
    printf("节点数量: %d\n", N_NODES);
    printf("容错能力: %d\n", F_TOLERANCE);
    printf("法定人数: %d\n", QUORUM);
    printf("拜占庭节点: %d\n", BYZANTINE_NODES);
    printf("共识请求数: %d\n", REQUESTS_COUNT);
    printf("网络丢包率: %.1f%%\n", PACKET_LOSS_RATE * 100);
    printf("================================\n\n");

    // 初始化所有节点
    initialize_nodes();

    // 创建线程 - 为每个节点创建独立的执行线程
    HANDLE *threads = (HANDLE *)malloc(N_NODES * sizeof(HANDLE));
    for (int i = 0; i < N_NODES; i++)
    {
        threads[i] = CreateThread(NULL, 0, NodeThread, &nodes[i], 0, NULL);
    }

    // 等待所有请求完成或超时
    // 超时时间 = REQUESTS_COUNT * 100ms(每个请求最多100ms)
    DWORD wait_result = WaitForSingleObject(consensus_complete_event, REQUESTS_COUNT * 100);

    if (wait_result == WAIT_OBJECT_0)
    {
        printf("所有共识请求处理完成!\n");
    }
    else
    {
        printf("超时,部分请求未完成。\n");
    }

    // 等待所有线程结束(最多5秒)
    WaitForMultipleObjects(N_NODES, threads, TRUE, 5000);

    // 计算最终统计信息
    total_consensus_failed = REQUESTS_COUNT - total_consensus_success;
    double success_rate = (double)total_consensus_success / REQUESTS_COUNT * 100;
    double avg_consensus_time = total_consensus_time / total_consensus_success;
    double throughput = total_consensus_success / total_consensus_time;

    // 输出详细的仿真结果统计
    printf("\n=== 仿真结果统计 ===\n");
    printf("总请求数: %d\n", REQUESTS_COUNT);
    printf("成功共识: %d\n", total_consensus_success);
    printf("失败共识: %d\n", total_consensus_failed);
    printf("成功率: %.2f%%\n", success_rate);
    printf("平均共识时间: %.4f 秒\n", avg_consensus_time);
    printf("吞吐量: %.2f TPS\n", throughput);
    printf("总网络消息: %d\n", total_messages_sent);
    printf("平均每请求消息: %.1f\n", (double)total_messages_sent / REQUESTS_COUNT);

    // 节点状态摘要 - 对比诚实节点和拜占庭节点的表现
    printf("\n=== 节点状态摘要 ===\n");
    int honest_completed = 0, byzantine_completed = 0;
    for (int i = 0; i < N_NODES; i++)
    {
        if (nodes[i].type == HONEST)
        {
            honest_completed += nodes[i].completed_requests;
        }
        else
        {
            byzantine_completed += nodes[i].completed_requests;
        }
    }
    printf("诚实节点平均完成: %.1f\n", (double)honest_completed / (N_NODES - BYZANTINE_NODES));
    if (BYZANTINE_NODES > 0)
    {
        printf("拜占庭节点平均完成: %.1f\n", (double)byzantine_completed / BYZANTINE_NODES);
    }

    // 清理所有资源
    cleanup_nodes();
    CloseHandle(consensus_complete_event);
    DeleteCriticalSection(&global_cs);
    free(threads);

    return 0;
}

------------------------------------------------------------------------------------------------------------

                                                                                                    更新于  2026.3.28 晚9:43

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐