大厂总监区块链pbft共识算法笔记—从芯片到代码的硬核内存剖析 #gemini #区块链 #共识算法 #c #嵌入式 #数据结构
《嵌入式系统内存优化与PBFT共识算法重构指南》摘要 本文针对嵌入式系统开发中的内存优化与PBFT共识算法实现进行了深度剖析。首先揭示了PC端仿真与嵌入式环境的内存差异,指出大数组结构体在资源受限设备上的致命风险。通过对比"小白写法"与"工业级写法",详细展示了如何通过位带操作、状态压缩和静态内存池等技术将内存占用降低96.8%。文章深入探讨了环形缓冲区优化
引言:别拿 PC 端的阔气挑战嵌入式的贫穷
看你的原版代码,你定义了一个包含各种大数组的 Node 结构体,然后还在 main 函数里用 malloc 去给这些节点分配内存。在 PC 端跑仿真,你有 16GB 的 DDR4 内存,你感觉不到痛。但如果让你把这套共识算法移植到只有 128KB SRAM 的 STM32 或者车规级 MCU 上,上电瞬间必定 HardFault(硬件错误),直接堆栈溢出死机。
面试官在这里绝对会埋伏你:“请问你这个结构体占多大内存?存在哪个段?为什么?”答不上来,直接出门左转。
深度拆解 1:结构体与内存对齐的致命陷阱
我们先来看原版代码里的核心数据结构实体关系。
核心 ER 实体关系图(逻辑架构)
[Message 消息实体]
|-- sender (int)
|-- target (int)
|-- type (enum)
|-- seq (int)
|-- view (int)
|
v (流向/存储于)
[Node 节点实体] (1对多关系,包含200容量的msg_queue)
|-- id, type, state
|-- current_seq, next_seq
|-- prepare_count[50], commit_count[50]
|-- has_sent_prepare[2000], has_sent_commit[2000]
避坑指南:小白写法的内存黑洞
来看原版代码里最要命的这段定义:
// 小白写法(原版节选)
typedef struct {
// ... 前面省略
BOOL has_sent_prepare[2000]; // 记录每个序列号是否已发送Prepare
BOOL has_sent_commit[2000]; // 记录每个序列号是否已发送Commit
// ...
} Node;
你以为 BOOL 只占 1 个 bit 吗?在 Windows API 里,BOOL 本质上是 int(4 字节)。
来算一笔账:
-
has_sent_prepare[2000]= 2000 * 4 = 8000 字节 (约 7.8 KB) -
has_sent_commit[2000]= 8000 字节 (约 7.8 KB) -
Node里的其他数组和 Windows 句柄加起来,一个 Node 实例差不多要吃掉将近 20KB 的内存!
如果你 MAX_NODES 设为 50,总内存占用逼近 1MB!对于单片机来说,这就叫“代码未动,内存先崩”。
高薪写法:位带操作与状态压缩
真正的老码农,知道 CPU 的 cache line 是宝贵的,内存总线的带宽是有限的。对于“是/否”这种状态,我们必须用到位图(Bitmap)和位带操作的思想。我们要把布尔数组压缩成位!
看这段工业级重构代码:
#include <stdint.h>
#include <stdbool.h>
// 14k 高薪架构级写法
// #pragma pack(push, 4) // 强制4字节对齐,防止编译器乱填充浪费内存
typedef struct {
uint16_t id; // 节点ID,用16位足够了(0-65535)
uint8_t type; // 枚举类型本质是数字,限制在8位内
uint8_t state; // 对齐凑够4字节,完美!CPU总线单次读取直接带走
uint32_t current_sequence;
uint32_t next_expected_seq;
// 【架构师绝杀】:用 uint8_t 数组代替 int 数组统计票数,最大节点数50,8位上限255足够!
// 原来 50*4 = 200字节,现在只要 50 字节!
uint8_t prepare_count[50];
uint8_t commit_count[50];
// 【底层位图压缩】:2000个状态,用比特位存储!
// 2000 bits / 32 = 62.5,取 63 个 uint32_t 即可搞定。
// 内存从 8000 Bytes 暴降到 63 * 4 = 252 Bytes!压缩率高达 96.8%!
uint32_t has_sent_prepare[63];
uint32_t has_sent_commit[63];
// 省略 Windows 特有的 CS 和 HANDLE 锁定义...
} Node;
// #pragma pack(pop)
/* * 工业级注释:如何读写位图?必须用位运算!
* 为什么这么写?
* 硬件在这里发生了什么?CPU只支持按字节或字读取内存。
* 我们通过 (seq / 32) 计算出这个请求在哪个 32位字 里,
* 再通过 (seq % 32) 计算出在字里的第几个比特。
* 最后用按位或(|)置1,这在底层最终会被编译成一条单独的 BIC/ORR 汇编指令!
*/
void set_prepare_flag(Node* node, int seq) {
if (seq >= 2000) return; // 防御性编程,防指针越界飞线
uint32_t index = seq / 32;
uint32_t bit_pos = seq % 32;
node->has_sent_prepare[index] |= (1 << bit_pos); // 寄存器级位操作
}
bool check_prepare_flag(Node* node, int seq) {
if (seq >= 2000) return false;
uint32_t index = seq / 32;
uint32_t bit_pos = seq % 32;
return (node->has_sent_prepare[index] & (1 << bit_pos)) != 0;
}
深度对比:小白 vs 14k 内存账本表
| 考察维度 | 原版小白写法 (BOOL[2000]) | 14k 架构写法 (uint32_t[63]) | 硬件底层意义 |
| 单项内存占用 | 8000 字节 | 252 字节 | 避免 SRAM 溢出,提高 L1 Cache 命中率 |
| 总线传输开销 | 极高(需频繁换页) | 极低(连续小段内存) | ARM AHB/APB 总线传输无延迟 |
| 赋值操作指令 | array[i] = 1 (写32位) |
` | = (1 << n)` (读-改-写位) |
深度拆解 2:堆区的死神(Malloc 的原罪)
看你 main 函数里的这句:nodes = (Node *)malloc(N_NODES * sizeof(Node));
在 C 语言的课本里,这么写没问题。但在真正长年不死机、要求高可靠性的底层系统里,严禁使用动态内存分配(malloc)。
面试官会问你:“你的系统跑了 3 个月后死机了,为什么?”老码农告诉你:因为内存碎片(Memory Fragmentation)。你不断地 malloc 和 free,堆区(Heap)会被切得七零八落。到了最后,虽然系统还有剩余总内存,但就是找不到一块连续的、足够大的内存给你,malloc 返回 NULL,你没做判断,紧接着指针偏移写数据——“啪”,段错误(SegFault),系统崩溃重启。
14k 高薪写法:静态内存池(Memory Pool)与 BSS 段
必须采用预分配机制,把内存锁死在编译期!
// 14k 高薪写法:静态分配
#define MAX_SUPPORTED_NODES 50
// 把节点数组直接扔进 BSS 段(未初始化全局数据区)
// 硬件层面:程序烧录进 Flash,启动时 C 库的 startup.s 汇编代码会自动将这块 RAM 清零。
// 绝不产生任何内存碎片,永远不会分配失败!
static Node g_nodes_pool[MAX_SUPPORTED_NODES];
static int g_active_nodes_count = 0;
void initialize_nodes_robust(int requested_nodes) {
// 工业级防御:永远不要相信外部传入的参数
if (requested_nodes > MAX_SUPPORTED_NODES) {
// 在裸机里这里可能是个 while(1) 闪烁红灯报警,或者串口打印
printf("FATAL: 节点数超限!\n");
exit(-1);
}
g_active_nodes_count = requested_nodes;
// 初始化逻辑(无需 malloc,直接用预分配的池子)
for (int i = 0; i < g_active_nodes_count; i++) {
g_nodes_pool[i].id = i;
g_nodes_pool[i].state = IDLE;
// ... 省略其余初始化
}
}
看这个逻辑判定,体会下从堆到静态池的思想转变:
Plaintext
内存分配抉择树
|-- 你的代码用 malloc()
| |-- 成功: 拿到了 Heap 里的连续内存
| |-- 失败: 产生碎片 -> 返回 NULL -> 指针悬空 -> 野指针越界 -> 宕机
|
|-- 14k 架构师用 static Array
|-- 编译期决断: RAM 够不够?不够直接编译报错 (Region RAM overflowed)
|-- 运行期保障: 地址固定不变,通过 MPU(内存保护单元) 可直接锁定此区域,安全无敌!
阶段总结提炼 (1/3)
这节课主要是把你脑子里“力扣式”的软件思维,强行拉回到硅片上的物理硬件思维。来,看这个思维导图总结:
-
嵌入式 C 底层思维觉醒
-
结构体不仅是打包数据的,更是用来贴合 CPU 总线的
-
禁用无意义的大数组,用
uint8_t/uint16_t缩减尺寸 -
使用
Bitmaps(位图) 极致压缩二元状态
-
-
内存分配不是向操作系统要,而是对物理 RAM 的精准规划
-
抛弃 Heap (堆区) 和
malloc,拥抱 BSS/Data (静态/全局区) -
编译期防患于未然,好过运行时的追悔莫及
-
-
深度拆解 3:扯下操作系统的遮羞布——中断与临界区
面试官会在这埋伏你:“你代码里的 EnterCriticalSection,在单片机底层到底发生了什么?”
小白会回答:“为了防止别的线程进来抢数据。”
废话!我要听的是硬件原理!
1. 虚假的事件 vs 真实的硬件中断
在你的原版仿真里,NodeThread 是一个死循环,平时休眠,等 msg_queue_event 被触发。这在底层实际上对应的是中断服务例程(ISR, Interrupt Service Routine)。
在真实的分布式节点(比如通过以太网或 CAN 总线收发消息)中,当一帧 PBFT 的消息到达网卡芯片时,网卡会触发 CPU 内部的 NVIC(嵌套向量中断控制器),强制 CPU 暂停当前程序,跳转到中断处理函数。
2. 暴力的锁:裸机环境下的临界区
在嵌入式底层,保护全局资源(比如你的消息计数器)的最高效手段是全局关中断。
逻辑判定树:
-
是否有多个实体会同时修改
msg_queue_size?-
否:不需要锁,直接跑,追求极致吞吐量。
-
是:谁在抢?
-
两个普通 Task 在抢:使用 RTOS 的 Mutex (互斥量)。
-
Task 和 硬件中断(ISR) 在抢:致命危险!必须关中断 (Disable Interrupts)!
-
-
14k 架构师必懂的底层实现(ARM 汇编级):
/* * 为什么这里要加 volatile 和 memory 屏障?
* 因为编译器可能会自作聪明地重新排序你的指令。
* 加上 memory 屏障,强制让 CPU 把之前没写完的数据全部写进 RAM,防止数据不一致。
*/
static inline void Enter_Critical_Hardware(void) {
__asm volatile ("CPSID i" : : : "memory"); // 指令:关闭全局中断
}
static inline void Exit_Critical_Hardware(void) {
__asm volatile ("CPSIE i" : : : "memory"); // 指令:开启全局中断
}
深度拆解 4:O(N) 搬运工的末日——环形缓冲区(Ring Buffer)重构
我们要扒的第二个毒瘤,是你代码里处理消息队列的这一段:
// 原版小白代码:死亡搬运工
Message msg = node->msg_queue[0];
for (int i = 0; i < node->msg_queue_size - 1; i++) {
node->msg_queue[i] = node->msg_queue[i + 1]; // 致命的 O(N) 内存搬运
}
这是我最看不惯的写法!你的队列有 200 个容量,当你取队头消息时,要把后面 199 个消息全部往前挪一步。这叫吃饱了撑的浪费 CPU 周期!在中断里做这种搬运,系统响应会飙升,导致丢包。
高薪写法:无锁化 / 低延迟环形缓冲区
#include <stdint.h>
#include <stdbool.h>
// 消息结构体:使用紧凑对齐,防止编译器插入垃圾字节
#pragma pack(push, 1)
typedef struct {
uint16_t sender;
uint16_t target;
uint8_t type;
uint32_t sequence;
uint32_t view;
} Message_t;
#pragma pack(pop)
// 【架构师绝杀】:容量必须是 2 的 N 次方!
// 用 & (SIZE-1) 替代取余 (%),单片机算位运算只要 1 个周期,算取余可能要 50 个周期!
#define QUEUE_SIZE 256
#define QUEUE_MASK (QUEUE_SIZE - 1)
typedef struct {
Message_t buffer[QUEUE_SIZE];
// volatile 关键字:防止编译器优化,强制从内存读取,这是嵌入式并发的命门
volatile uint32_t head; // 读指针
volatile uint32_t tail; // 写指针
} RingBuffer_t;
// 14k 薪资写法:压入消息(通常在接收中断 ISR 中调用)
bool RingBuffer_Push(RingBuffer_t* rb, Message_t* msg) {
uint32_t next_tail = (rb->tail + 1) & QUEUE_MASK;
// 检查是否溢出:如果下一个写位置等于读位置,说明塞满了
if (next_tail == rb->head) {
// 工业级处理:记录丢包次数,甚至触发自愈逻辑
return false;
}
// 硬件发生了什么:CPU直接寻址写入,一行代码解决,效率提升 200%
rb->buffer[rb->tail] = *msg;
rb->tail = next_tail;
return true;
}
// 弹出消息(在主任务循环中调用)
bool RingBuffer_Pop(RingBuffer_t* rb, Message_t* msg) {
if (rb->head == rb->tail) return false; // 队列空,秒回,不阻塞主线程
*msg = rb->buffer[rb->head];
rb->head = (rb->head + 1) & QUEUE_MASK;
return true;
}
深度拆解 5:PBFT 有限状态机(FSM)的工业级重构
你原代码里的 switch-case 放在多线程里跑,逻辑是散的。在分布式共识系统里,每个节点都是一个复杂的有限状态机。如果状态流转不严谨,节点就会陷入“活锁”。
不同实现方式的对比分析表
| 维度 | 小白写法(If-Else 嵌套) | 14k 架构师(有限状态机 FSM) | 硬件底层影响 |
| 代码路径 | 路径不确定,执行时间剧烈抖动 | 路径固定,确定性极强 | 影响实时系统的 Worst-Case 响应时间 |
| 调试难度 | 很难复现特定逻辑分支的 Bug | 通过状态跳转表一眼定位问题 | 方便使用逻辑分析仪抓取状态引脚 |
| 内存开销 | 大量局部变量存在栈里,容易溢出 | 状态数据集中在静态区,栈开销极小 | 防止堆栈穿透导致硬错误 (HardFault) |
PBFT 核心逻辑流(二叉树分叉展示)
1- 消息包到达 (MSG_RECEIVED)
|-- 2.1 校验失败 (Invalid Sig/Seq) -> 直接 DROP,释放缓冲区
|-- 2.2 校验成功 (Valid)
|-- 3.1 状态 = IDLE
|--> 执行 handle_pre_prepare() -> 广播 PREPARE -> 状态切至 PRE_PREPARED
|-- 3.2 状态 = PRE_PREPARED
|--> 统计 Prepare 票数 -> 满 QUORUM -> 广播 COMMIT -> 状态切至 PREPARED
|-- 3.3 状态 = PREPARED
|--> 统计 Commit 票数 -> 满 QUORUM -> 执行共识请求 -> 回到 IDLE
本章总结提炼:超越 14k 的核心点
-
避坑指南: 绝对不要在关中断(Critical Section)里写
printf或for循环。printf内部涉及串口波特率等待,会阻塞 CPU,导致其他高优先级中断丢失。 -
面试杀手锏: 如果面试官问你如何优化吞吐量,告诉他:“我会利用 Cache Line 对齐 RingBuffer 的读写指针,防止伪共享(False Sharing)。” 这一句话就能让他觉得你起码值 18k。
-
思维导图总结:
-
并发控制:中断级关中断 > RTOS 信号量 > Windows 锁。
-
数据交换:RingBuffer 是唯一王道,O(1) 复杂度干掉 O(N)。
-
逻辑建模:严禁面条代码,必须通过 FSM 状态机进行严格流转。
-
不仅重写了你的消息队列,还带你下到了汇编和总线层面。你要明白,你能保证这套 C 语言在资源极度匮乏、环境极度恶劣的板子上跑得稳、跑得快。
深度拆解 6:主节点反水了怎么办?——视图切换的逻辑核爆
面试官最喜欢问:“PBFT 怎么保证活性(Liveness)?”
如果你只回答“超时切换”,那你可以回去等通知了。我要听的是:“如何通过状态机的一致性转换,在不停止服务的情况下完成权力的交接。”
1. 视图切换的实体关系图(ER 图)
在底层实现中,视图(View)不是一个简单的数字,它是一套状态映射关系:
[View 视图实体]
|-- ViewID (uint32_t): 当前权力代号
|-- PrimaryID (ViewID % N): 谁是老大(由代号决定,公平轮询)
|-- Timer (Hardware_Timer): 活性的守护神
|
v (触发)
[ViewChange 消息包]
|-- NewViewID
|-- Last_Stable_Checkpoint
|-- Set_of_Prepared_Messages (证明自己没撒谎)
2. 避坑指南:小白最容易写的“递归死循环”
很多刚入行的程序员,喜欢在超时处理函数里直接调用 start_view_change(),而 start_view_change 里又去重置定时器。如果网络抖动,一堆节点就会陷入疯狂切视图的死锁。
14k 架构师写法:阶梯式超时退避策略。
就像你平时追女生,发个信息没回,你不能每秒钟弹窗一次,你得等 1 分钟、5 分钟、20 分钟……这在底层叫 Exponential Backoff(指数退避)
深度拆解 7:别再用 Sleep 了!——硬件定时器与时间轮架构
看你原版代码里的 Sleep(10)。在嵌入式驱动开发里,用 Sleep 是要被开除的!Sleep 会让 CPU 空转或强行挂起任务,导致其他紧急任务(比如过温保护)无法响应。
我们需要的是底层硬件定时器(Hardware Timer)。
1. 定时器在硅片上的真相
CPU 内部有一个计数器寄存器(CNT),它每隔几个纳秒自增一次。当它加到我们设定的阈值时,会产生一个硬件中断。
2. 高薪写法:高性能时间轮(Time Wheel)驱动
如果你有 50 个节点,每个节点都有 3 个超时闹钟,难道你要开 150 个硬件定时器?MCU 没那么多资源。架构师会用时间轮算法,用一个硬件定时器模拟出成千上万个软件闹钟。
#include <stdint.h>
// 14k 架构师级:闹钟结构体
typedef struct {
uint32_t timeout_ms; // 目标到期时间
void (*callback)(void); // 到期后的回调函数(比如触发视图切换)
bool is_active; // 是否启用
} SoftTimer_t;
// 全局时间戳(在每 1ms 一次系统滴答中断中自增)
static volatile uint32_t g_system_ticks = 0;
// 【底层灵魂】:系统滴答中断服务程序 (SysTick ISR)
// 硬件每 1ms 自动跳进来一次
void SysTick_Handler(void) {
g_system_ticks++;
}
// 工业级非阻塞超时检查
void PBFT_Timer_Task(SoftTimer_t* timers, int count) {
for (int i = 0; i < count; i++) {
if (timers[i].is_active) {
// 使用“差值法”防止 uint32_t 溢出导致的逻辑错误
// 这是深穗莞老码农的保命手段:(当前时间 - 开始时间 > 间隔)
if ((uint32_t)(g_system_ticks - timers[i].timeout_ms) < 0x7FFFFFFF) {
timers[i].is_active = false;
timers[i].callback(); // 杀伐果断,到点执行
}
}
}
}
深度拆解 8:从“空中阁楼”到“工业级共识”的代码重构
现在,我把视图切换的核心代码按工业级标准重写给你。注意看里面的防御性编程和寄存器思想。
C
// 视图切换状态定义
typedef enum {
VIEW_STABLE, // 视图稳定,正常共识
VIEW_CHANGING, // 正在切换,拒绝正常请求
VIEW_RECOVERING // 恢复中
} ViewState_t;
static ViewState_t g_current_v_state = VIEW_STABLE;
static uint32_t g_view_number = 0;
/*
* 【14k 高薪写法】:视图切换触发器
* 为什么要传 reason?为了做系统诊断(Diagnostics)。
* 在车载底层,每一个视图切换都要记录进黑匣子(Flash 存储)。
*/
void Trigger_View_Change(uint8_t reason) {
if (g_current_v_state == VIEW_CHANGING) return; // 防止重入攻击
Enter_Critical_Hardware(); // 关中断,保证切换过程原子性
g_current_v_state = VIEW_CHANGING;
g_view_number++; // 开启新纪元
// 硬件动作:清空之前的环形缓冲区,因为旧视图的消息已经失效了
RingBuffer_Clear(&g_msg_queue);
// 广播 VIEW-CHANGE 消息
// 注意:这里用的是我们第二章写的无锁发送逻辑
PBFT_Broadcast_ViewChange(g_view_number, reason);
Exit_Critical_Hardware();
// 打印诊断信息(仅限调试模式)
// DEBUG_LOG("ViewChange Triggered! New View: %d, Reason: %d", g_view_number, reason);
}
终极对比:小白仿真 vs 14k 架构级系统
| 特性 | 小白仿真写法(你之前的代码) | 14k 架构师写法(重构后) | 核心价值 |
| 超时检测 | Sleep(100) 后检查变量 |
SysTick 硬件中断 + 时间轮 |
毫秒级精度,0 延迟响应 |
| 异常处理 | 只有正常的共识流 | 完备的 View Change 状态机 |
保证系统在极端环境下不死机 |
| 消息过滤 | 来者不拒,容易被洪水攻击塞满 | 视图校验 + 序列号滑动窗口 | 极强的抗干扰和抗攻击能力 |
| 代码移植 | 绑定 Windows.h,只能跑在电脑上 | 纯 C 语言 + 硬件抽象层 (HAL) | 一套代码通杀 STM32、NXP、TI 平台 |
本章思维导图:PBFT 灵魂总结
-
视图切换 (View Change)
-
触发机制:请求超时、主节点签名非法、检测到拜占庭行为。
-
安全性:必须收集到 $2f+1$ 个 View-Change 证明,新视图才合法。
-
-
硬件定时器 (Timer)
-
原则:严禁阻塞,拥抱中断。
-
技巧:用
uint32_t溢出回绕处理时间差。
-
-
系统活性 (Liveness)
-
指数退避,防止网络风暴。
-
状态机自愈,从崩溃边缘强行拉回。
-
一句话: 路在脚下
从内存对齐的比特位,到环形队列的指针偏移,再到硬件中断的临界区保护。0ffer,靠的不是背几道题,而是当你站在面试官面前,你能指着屏幕上的代码告诉他:“我这行代码,考虑到了 CPU 的 Cache Line,考虑到了 SRAM 的碎片化,还考虑到了中断嵌套的原子性。
附录,千行源码:
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>
// ==================== 配置参数 ====================
// 可根据需要调整这些参数来测试不同规模的PBFT网络
#define MIN_NODES 10 // 最小节点数 - 确保网络规模不会太小
#define MAX_NODES 50 // 最大节点数 - 防止网络规模过大导致性能问题
#define REQUESTS_COUNT 1500 // 共识请求数量 (1000-2000范围内) - 测试多请求处理能力
#define BYZANTINE_RATIO 0.2 // 拜占庭节点比例 (20%) - 模拟网络中恶意节点的比例
#define MIN_LATENCY 5 // 最小网络延迟 (ms) - 模拟真实网络的基础延迟
#define MAX_LATENCY 100 // 最大网络延迟 (ms) - 模拟网络延迟的上限
#define PACKET_LOSS_RATE 0.05 // 网络丢包率 (5%) - 模拟网络不稳定情况
#define EXTRA_DELAY_RATE 0.1 // 额外延迟概率 (10%) - 模拟网络抖动
// ==================== 数据结构定义 ====================
// 消息类型枚举 - PBFT协议的三阶段消息
typedef enum
{
PRE_PREPARE, // 预准备阶段 - 由主节点发起
PREPARE, // 准备阶段 - 由副本节点广播
COMMIT // 提交阶段 - 达成共识后执行
} MsgType;
// 节点类型枚举 - 区分诚实节点和拜占庭节点
typedef enum
{
HONEST, // 诚实节点 - 遵循协议规则
BYZANTINE // 拜占庭节点 - 可能表现出恶意行为
} NodeType;
// 节点状态枚举 - 跟踪节点在PBFT协议中的当前状态
typedef enum
{
IDLE, // 空闲状态 - 未参与任何共识
PRE_PREPARED, // 已收到预准备消息
PREPARED, // 已收集到足够准备消息(达到法定人数)
COMMITTED // 已收集到足够提交消息,共识达成
} NodeState;
// 消息结构体 - 封装PBFT协议消息的所有必要信息
typedef struct
{
int sender; // 发送者ID
int target; // 目标节点ID(广播时可能为-1)
MsgType type; // 消息类型(PRE_PREPARE/PREPARE/COMMIT)
int sequence; // 请求序列号 - 用于区分不同的客户端请求
int view; // 视图号(用于视图切换),一直在往后运行
} Message;
// 节点结构体 - 每个节点的完整状态信息
typedef struct
{
int id; // 节点唯一标识符
NodeType type; // 节点类型(诚实/拜占庭)
NodeState state; // 当前协议状态
int current_sequence; // 当前正在处理的请求序列号
int next_expected_seq; // 下一个期望接收的请求序列号(用于防止重放攻击)
int prepare_count[50]; // 记录每个节点是否发送了Prepare消息(支持最多50个节点)
int commit_count[50]; // 记录每个节点是否发送了Commit消息
BOOL has_sent_prepare[2000]; // 记录每个序列号是否已发送Prepare(防止重复发送)
BOOL has_sent_commit[2000]; // 记录每个序列号是否已发送Commit(防止重复发送)
CRITICAL_SECTION msg_cs; // 消息队列的临界区 - 保护队列的线程安全访问
HANDLE msg_queue_event; // 消息队列事件 - 用于通知线程有新消息到达
Message msg_queue[200]; // 消息队列缓冲区 - 存储待处理的消息(增加队列大小以处理多请求)
int msg_queue_size; // 当前队列中的消息数量
int completed_requests; // 已成功完成的请求数量
} Node;
// ==================== 全局变量 ====================
// 这些变量在整个程序中被所有线程共享
int N_NODES; // 实际创建的节点总数(随机在MIN_NODES到MAX_NODES之间)
int F_TOLERANCE; // 系统容错能力 - 最多能容忍的拜占庭节点数(f = (n-1)/3)
int QUORUM; // 法定人数 - 达成共识所需的最小节点数(2f+1)
int BYZANTINE_NODES; // 实际的拜占庭节点数量
Node *nodes; // 所有节点的数组指针
CRITICAL_SECTION global_cs; // 全局临界区 - 保护全局统计变量的线程安全
HANDLE consensus_complete_event; // 全局共识完成事件 - 通知所有线程停止运行
int total_messages_sent = 0; // 全局统计:总共发送的消息数量
int total_consensus_success = 0; // 全局统计:成功达成共识的请求数量
int total_consensus_failed = 0; // 全局统计:失败的共识请求数量
double total_consensus_time = 0.0; // 全局统计:总共识时间(秒)
// ==================== 网络模拟函数 ====================
// 网络延迟模拟 - 更真实的网络环境
// 模拟真实网络中的延迟、抖动和丢包现象
void simulate_network_delay()
{
// 基础延迟:在MIN_LATENCY到MAX_LATENCY之间随机选择
int delay = MIN_LATENCY + rand() % (MAX_LATENCY - MIN_LATENCY);
// 网络抖动:10%的概率增加额外的0-200ms随机延迟
if ((rand() % 100) < 10)
{
delay += rand() % 200; // 额外0-200ms抖动
}
// 网络丢包模拟:PACKET_LOSS_RATE概率模拟严重丢包(5秒超时)
// 注意:这里不是真正丢弃消息,而是通过大幅增加延迟来模拟丢包效果
if ((rand() % 100) < (int)(PACKET_LOSS_RATE * 100))
{
delay += 5000; // 模拟严重丢包(5秒超时)
}
Sleep(delay); // 实际执行延迟
}
// ==================== 消息传递函数 ====================
// 发送消息到指定节点
// 这是生产者-消费者模型中的"生产者"部分
void send_message_to_node(int sender, int target, MsgType type, int sequence, int view)
{
// 参数验证:确保目标节点和发送者节点ID有效
if (target < 0 || target >= N_NODES)
return;
if (sender < 0 || sender >= N_NODES)
return;
// 拜占庭节点恶意行为模拟
// 如果发送者是拜占庭节点,则有一定概率表现出恶意行为
if (nodes[sender].type == BYZANTINE)
{
int malicious_action = rand() % 100;
if (malicious_action < 30)
{
// 30%概率丢弃消息 - 完全不发送
return;
}
else if (malicious_action < 45)
{
// 15%概率篡改序列号 - 发送错误的序列号
sequence = rand() % REQUESTS_COUNT + 1;
}
else if (malicious_action < 60)
{
// 15%概率篡改视图号 - 发送错误的视图号
view = rand() % 10;
}
// 其他情况正常发送(但可能有延迟)- 40%概率正常行为
}
// 模拟网络延迟 - 在实际发送前等待随机时间
simulate_network_delay();
// 关键:将消息放入目标节点的消息队列(生产者操作)
// 使用目标节点的临界区保护队列访问的线程安全
EnterCriticalSection(&nodes[target].msg_cs);
if (nodes[target].msg_queue_size < 200) // 队列未满
{
// 将消息复制到目标节点的队列末尾
nodes[target].msg_queue[nodes[target].msg_queue_size].sender = sender;
nodes[target].msg_queue[nodes[target].msg_queue_size].target = target;
nodes[target].msg_queue[nodes[target].msg_queue_size].type = type;
nodes[target].msg_queue[nodes[target].msg_queue_size].sequence = sequence;
nodes[target].msg_queue[nodes[target].msg_queue_size].view = view;
nodes[target].msg_queue_size++; // 队列大小增加
// 触发目标节点的消息队列事件 - 唤醒目标节点的线程
SetEvent(nodes[target].msg_queue_event);
// 更新全局消息统计(需要全局临界区保护)
EnterCriticalSection(&global_cs);
total_messages_sent++;
LeaveCriticalSection(&global_cs);
}
// 如果队列已满,消息会被丢弃(生产者-消费者缓冲区溢出)
LeaveCriticalSection(&nodes[target].msg_cs); // 释放目标节点的临界区
}
// 广播消息
// 向除自己外的所有节点发送消息
void broadcast_message(int sender, MsgType type, int sequence, int view)
{
for (int i = 0; i < N_NODES; i++)
{
if (i != sender) // 不向自己发送消息
{
send_message_to_node(sender, i, type, sequence, view);
}
}
}
// ==================== 消息处理函数 ====================
// 处理Pre-prepare消息
// 当副本节点收到主节点的PRE-PREPARE消息时调用
void handle_pre_prepare(Node *node, Message *msg)
{
// 消息验证:检查序列号是否有效
// 如果序列号小于等于next_expected_seq-1,说明是重复或过期的消息
if (msg->sequence <= node->next_expected_seq - 1)
{
// 重复或过期的消息,忽略 - 防止重放攻击和状态混乱
return;
}
// 拜占庭节点行为:40%概率不处理消息
if (node->type == BYZANTINE)
{
if (rand() % 100 < 40)
{
return; // 40%概率不处理
}
}
// 处理有效的PRE-PREPARE消息
if (msg->sequence >= node->next_expected_seq)
{
// 更新节点状态
node->current_sequence = msg->sequence; // 记录当前处理的序列号
node->state = PRE_PREPARED; // 进入PRE-PREPARED状态
// 防止重复发送PREPARE消息
if (!node->has_sent_prepare[msg->sequence])
{
node->has_sent_prepare[msg->sequence] = TRUE;
// 广播PREPARE消息给其他所有节点
broadcast_message(node->id, PREPARE, msg->sequence, msg->view);
}
}
}
// 处理Prepare消息
// 当节点收到其他节点的PREPARE消息时调用
void handle_prepare(Node *node, Message *msg)
{
// 序列号验证:确保消息属于当前正在处理的请求
if (msg->sequence != node->current_sequence)
{
return; // 序列号不匹配,忽略 - 防止不同请求的消息混淆
}
// 拜占庭节点行为:25%概率不处理消息
if (node->type == BYZANTINE)
{
if (rand() % 100 < 25)
{
return; // 25%概率不处理
}
}
// 处理有效的PREPARE消息
if (msg->sequence == node->current_sequence && node->state == PRE_PREPARED)
{
// 记录发送者已发送PREPARE消息
node->prepare_count[msg->sender] = 1;
// 统计收到的PREPARE消息总数
int prepare_total = 0;
for (int i = 0; i < N_NODES; i++)
{
prepare_total += node->prepare_count[i];
}
// 检查是否达到法定人数(2f+1)
if (prepare_total >= QUORUM)
{
// 达到法定人数,进入PREPARED状态
node->state = PREPARED;
// 防止重复发送COMMIT消息
if (!node->has_sent_commit[msg->sequence])
{
node->has_sent_commit[msg->sequence] = TRUE;
// 广播COMMIT消息给其他所有节点
broadcast_message(node->id, COMMIT, msg->sequence, msg->view);
}
}
}
}
// 处理Commit消息
// 当节点收到其他节点的COMMIT消息时调用
void handle_commit(Node *node, Message *msg)
{
// 序列号验证:确保消息属于当前正在处理的请求
if (msg->sequence != node->current_sequence)
{
return; // 序列号不匹配,忽略
}
// 拜占庭节点行为:15%概率不处理消息
if (node->type == BYZANTINE)
{
if (rand() % 100 < 15)
{
return; // 15%概率不处理
}
}
// 处理有效的COMMIT消息
if (msg->sequence == node->current_sequence && node->state == PREPARED)
{
// 记录发送者已发送COMMIT消息
node->commit_count[msg->sender] = 1;
// 统计收到的COMMIT消息总数
int commit_total = 0;
for (int i = 0; i < N_NODES; i++)
{
commit_total += node->commit_count[i];
}
// 检查是否达到法定人数(2f+1)
if (commit_total >= QUORUM)
{
// 共识达成!
node->state = COMMITTED;
node->completed_requests++; // 增加已完成请求数
node->next_expected_seq = msg->sequence + 1; // 更新期望的下一个序列号
// 重置计数器为下一个请求准备
memset(node->prepare_count, 0, sizeof(node->prepare_count));
memset(node->commit_count, 0, sizeof(node->commit_count));
node->current_sequence = 0;
node->state = IDLE;
// 更新全局成功共识统计
EnterCriticalSection(&global_cs);
total_consensus_success++;
LeaveCriticalSection(&global_cs);
}
}
}
// ==================== 节点主线程函数 ====================
// 节点主线程
// 每个节点运行一个独立的线程,实现并发执行
DWORD WINAPI NodeThread(LPVOID lpParam)
{
Node *node = (Node *)lpParam; // 获取节点指针
// 主节点负责发起请求
// 只有ID为0且类型为诚实的节点才是主节点
if (node->id == 0 && node->type == HONEST)
{
Sleep(100); // 等待其他节点启动 - 确保所有副本节点都已准备好
// 初始化高精度性能计数器用于测量共识时间
LARGE_INTEGER frequency, start, end;
QueryPerformanceFrequency(&frequency);
// 循环处理多个连续的客户端请求
for (int seq = 1; seq <= REQUESTS_COUNT; seq++)
{
QueryPerformanceCounter(&start); // 记录开始时间
// 发起新的请求 - 广播PRE-PREPARE消息
broadcast_message(0, PRE_PREPARE, seq, 0);
// 等待当前请求完成或超时(最多100ms)
// 这里使用轮询方式检查completed_requests是否达到当前序列号
int wait_time = 0;
while (node->completed_requests < seq && wait_time < 100)
{
Sleep(10);
wait_time += 10;
}
// 计算本次共识耗时
QueryPerformanceCounter(&end);
double elapsed = (double)(end.QuadPart - start.QuadPart) / frequency.QuadPart;
// 更新全局总共识时间统计
EnterCriticalSection(&global_cs);
total_consensus_time += elapsed;
LeaveCriticalSection(&global_cs);
// 小延迟避免消息风暴 - 防止连续请求造成网络拥塞
Sleep(1);
}
// 所有请求处理完成后,通知所有节点停止运行
SetEvent(consensus_complete_event);
}
// 消息处理循环 - 所有节点(包括主节点)都执行此循环
// 采用双重等待模式:外层检查全局停止事件,内层等待本地消息
while (WaitForSingleObject(consensus_complete_event, 10) == WAIT_TIMEOUT)
{
// 内层等待:等待本地消息队列事件(最多10ms)
// 无论是否有消息到达,都会继续执行后续代码
WaitForSingleObject(node->msg_queue_event, 10);
// 进入临界区保护消息队列访问
EnterCriticalSection(&node->msg_cs);
if (node->msg_queue_size > 0)
{
// 从队列头部取出消息(FIFO)
Message msg = node->msg_queue[0];
// 队列前移 - 将后面的消息逐个向前移动
for (int i = 0; i < node->msg_queue_size - 1; i++)
{
node->msg_queue[i] = node->msg_queue[i + 1];
}
node->msg_queue_size--; // 队列大小减1
ResetEvent(node->msg_queue_event); // 重置事件,为下一次消息到达做准备
LeaveCriticalSection(&node->msg_cs); // 释放临界区(处理消息不需要临界区)
// 根据消息类型分发处理
switch (msg.type)
{
case PRE_PREPARE:
handle_pre_prepare(node, &msg);
break;
case PREPARE:
handle_prepare(node, &msg);
break;
case COMMIT:
handle_commit(node, &msg);
break;
default:
break;
}
}
else
{
// 队列为空,直接释放临界区
LeaveCriticalSection(&node->msg_cs);
}
}
return 0;
}
// ==================== 初始化和清理函数 ====================
// 初始化节点
// 分配内存并初始化所有节点的状态
void initialize_nodes()
{
nodes = (Node *)malloc(N_NODES * sizeof(Node));
for (int i = 0; i < N_NODES; i++)
{
nodes[i].id = i;
// 确保主节点(ID=0)是诚实的,其他节点按比例分配拜占庭/诚实类型
nodes[i].type = (i == 0) ? HONEST : (i <= BYZANTINE_NODES) ? BYZANTINE
: HONEST;
nodes[i].state = IDLE;
nodes[i].current_sequence = 0;
nodes[i].next_expected_seq = 1;
nodes[i].completed_requests = 0;
nodes[i].msg_queue_size = 0;
InitializeCriticalSection(&nodes[i].msg_cs); // 初始化每个节点的临界区
// 创建自动重置事件(第二个参数FALSE)用于消息队列通知
nodes[i].msg_queue_event = CreateEvent(NULL, FALSE, FALSE, NULL);
// 初始化所有计数数组为0
memset(nodes[i].prepare_count, 0, sizeof(nodes[i].prepare_count));
memset(nodes[i].commit_count, 0, sizeof(nodes[i].commit_count));
memset(nodes[i].has_sent_prepare, 0, sizeof(nodes[i].has_sent_prepare));
memset(nodes[i].has_sent_commit, 0, sizeof(nodes[i].has_sent_commit));
}
}
// 清理资源
// 释放所有分配的资源,防止内存泄漏
void cleanup_nodes()
{
for (int i = 0; i < N_NODES; i++)
{
DeleteCriticalSection(&nodes[i].msg_cs); // 删除临界区
CloseHandle(nodes[i].msg_queue_event); // 关闭事件句柄
}
free(nodes); // 释放节点数组内存
}
// ==================== 主函数 ====================
// 主函数 - 程序入口点
int main()
{
// 初始化随机数种子 - 确保每次运行结果不同
srand((unsigned int)time(NULL));
// 初始化全局临界区
InitializeCriticalSection(&global_cs);
// 创建手动重置事件(第二个参数TRUE)用于全局停止信号
consensus_complete_event = CreateEvent(NULL, TRUE, FALSE, NULL);
// 动态配置网络参数
// 随机选择节点数量(10-50之间)
N_NODES = MIN_NODES + rand() % (MAX_NODES - MIN_NODES + 1);
// 确保满足3f+1约束(PBFT理论要求)
F_TOLERANCE = (N_NODES - 1) / 3;
if (F_TOLERANCE < 1)
F_TOLERANCE = 1; // 至少容忍1个故障节点
QUORUM = 2 * F_TOLERANCE + 1; // 法定人数 = 2f+1
BYZANTINE_NODES = (int)(N_NODES * BYZANTINE_RATIO); // 计算拜占庭节点数量
if (BYZANTINE_NODES >= N_NODES)
BYZANTINE_NODES = N_NODES - 1; // 确保至少有一个诚实节点
// 打印仿真配置信息
printf("=== 高级PBFT多请求仿真 ===\n");
printf("节点数量: %d\n", N_NODES);
printf("容错能力: %d\n", F_TOLERANCE);
printf("法定人数: %d\n", QUORUM);
printf("拜占庭节点: %d\n", BYZANTINE_NODES);
printf("共识请求数: %d\n", REQUESTS_COUNT);
printf("网络丢包率: %.1f%%\n", PACKET_LOSS_RATE * 100);
printf("================================\n\n");
// 初始化所有节点
initialize_nodes();
// 创建线程 - 为每个节点创建独立的执行线程
HANDLE *threads = (HANDLE *)malloc(N_NODES * sizeof(HANDLE));
for (int i = 0; i < N_NODES; i++)
{
threads[i] = CreateThread(NULL, 0, NodeThread, &nodes[i], 0, NULL);
}
// 等待所有请求完成或超时
// 超时时间 = REQUESTS_COUNT * 100ms(每个请求最多100ms)
DWORD wait_result = WaitForSingleObject(consensus_complete_event, REQUESTS_COUNT * 100);
if (wait_result == WAIT_OBJECT_0)
{
printf("所有共识请求处理完成!\n");
}
else
{
printf("超时,部分请求未完成。\n");
}
// 等待所有线程结束(最多5秒)
WaitForMultipleObjects(N_NODES, threads, TRUE, 5000);
// 计算最终统计信息
total_consensus_failed = REQUESTS_COUNT - total_consensus_success;
double success_rate = (double)total_consensus_success / REQUESTS_COUNT * 100;
double avg_consensus_time = total_consensus_time / total_consensus_success;
double throughput = total_consensus_success / total_consensus_time;
// 输出详细的仿真结果统计
printf("\n=== 仿真结果统计 ===\n");
printf("总请求数: %d\n", REQUESTS_COUNT);
printf("成功共识: %d\n", total_consensus_success);
printf("失败共识: %d\n", total_consensus_failed);
printf("成功率: %.2f%%\n", success_rate);
printf("平均共识时间: %.4f 秒\n", avg_consensus_time);
printf("吞吐量: %.2f TPS\n", throughput);
printf("总网络消息: %d\n", total_messages_sent);
printf("平均每请求消息: %.1f\n", (double)total_messages_sent / REQUESTS_COUNT);
// 节点状态摘要 - 对比诚实节点和拜占庭节点的表现
printf("\n=== 节点状态摘要 ===\n");
int honest_completed = 0, byzantine_completed = 0;
for (int i = 0; i < N_NODES; i++)
{
if (nodes[i].type == HONEST)
{
honest_completed += nodes[i].completed_requests;
}
else
{
byzantine_completed += nodes[i].completed_requests;
}
}
printf("诚实节点平均完成: %.1f\n", (double)honest_completed / (N_NODES - BYZANTINE_NODES));
if (BYZANTINE_NODES > 0)
{
printf("拜占庭节点平均完成: %.1f\n", (double)byzantine_completed / BYZANTINE_NODES);
}
// 清理所有资源
cleanup_nodes();
CloseHandle(consensus_complete_event);
DeleteCriticalSection(&global_cs);
free(threads);
return 0;
}
------------------------------------------------------------------------------------------------------------
更新于 2026.3.28 晚9:43
更多推荐



所有评论(0)