技术总监万行代码详解:C语言仿真区块链PBFT共识算法 #1 单共识7节点版本 #从codeforAC到吃透区块链底层共识 #gemini
本文以工程化视角拆解 PBFT 共识算法,通过 C 语言多线程仿真,将抽象的分布式一致性问题还原为“线程 + 消息队列 + 状态机”的底层实现。文章从系统模型出发,深入分析 N=3f+1 与 2f+1 法定人数的安全本质,逐步讲透 Pre-Prepare、Prepare、Commit 三阶段设计的必要性。同时,对原始仿真代码进行系统级重构:引入 O(1) 环形队列优化性能,采用事件驱动模型提升并发
--------------------------------------------------------------------------------------------------------------------------- 更新于2026.3.30
如果一听到“区块链”、“PBFT共识算法”、“拜占庭容错”这些词就觉得高大上、云里雾里——那这篇文章就是为你量身定制的。
别被高大上的名词唬住了。抛开炒币的噱头,分布式共识系统,在底层C程序员眼里,就是一个多线程(多进程)通过网络互相发结构体,用状态机维护自身状态,并用锁和事件机制解决并发冲突的系统。
今天,咱们就拿一份通义灵码写的PBFT仿真代码开刀。这份代码作为原型非常棒,跨越了单线程递归的玩具级别,上了真·多线程并发。但站在工业级底层开发的角度,它还有硬伤。
今天,我们就来一场“代码级解剖”,不仅要看懂,还要把它改得更牛逼。看完这篇,面试官问你分布式,你可以直接拿底层的并发和队列去碾压他。如果觉得干货够硬,别忘了点赞、收藏、关注一波,老哥的心血不能白费!
一、 总览:PBFT到底在干嘛?(总-分-总解析)
1.1 核心思想 (总)
一句话概括:在一群节点(机器)里,有几台可能宕机,甚至可能恶意发假消息(拜占庭节点)。PBFT算法能保证只要好人的数量足够多,整个系统就能达成一致的决定。
数学铁律:N = 3f + 1。
-
N:总节点数 -
f:能容忍的最大坏蛋/故障节点数。 -
为什么是
3f+1?因为坏人有f个,为了压制坏人,好人必须比坏人多,且在网络分区时也能达成多数派。
1.2 角色与状态映射 (分)
在C语言的视角下,我们把高大上的概念翻译成代码实体:
|
区块链/分布式名词 |
C语言/底层系统映射 |
原代码实现 |
|---|---|---|
|
Node (节点) |
一个独立的线程 (Thread) |
|
|
P2P网络广播 |
内存拷贝 + 锁并发控制 |
|
|
网络延迟/丢包 |
随机数生成器 + 线程休眠 |
|
|
共识状态机 |
|
|
|
事件通知 |
操作系统事件对象/信号量 |
Windows API |
1.3 小结:多线程单机仿真 (总)
我们的仿真环境,就是用一台电脑上的多个线程,模拟分布在世界各地的多台服务器。线程间通过读写彼此的“消息队列(数组)”来模拟网络通信。既然是多线程读写共享变量,那**临界区(锁)**就绝对跑不掉。
二、 数据结构扒皮:一个节点的五脏六腑
我们先来看原代码中最核心的数据结构 Node。看懂了它,你就懂了70%。
2.1 Node结构体 ER模型分析
一个完整的PBFT节点,不仅要有身份证(ID),还要有账本(记票器)、状态机、以及最重要的——收件箱(带锁的队列)。
[ Node (节点实体) ]
├─ 基础属性
│ ├─ id: int (节点编号 0~6)
│ └─ type: NodeType (好人 HONEST / 坏人 BYZANTINE)
│
├─ PBFT核心状态机
│ ├─ state: NodeState (当前状态:IDLE -> PRE_PREPARED -> PREPARED -> COMMITTED)
│ └─ sequence: int (当前正在处理的提案序列号)
│
├─ 记票簿 (防双花、防作伪)
│ ├─ prepare_count[N_NODES]: int数组 (记录收到了谁的Prepare票)
│ ├─ commit_count[N_NODES]: int数组 (记录收到了谁的Commit票)
│ ├─ has_sent_prepare: BOOL (防重发标记位)
│ └─ has_sent_commit: BOOL (防重发标记位)
│
└─ 线程通信组件 (底层硬核部分)
├─ msg_cs: CRITICAL_SECTION (Windows临界区,相当于 pthread_mutex)
├─ msg_queue_event: HANDLE (Windows事件,相当于 pthread_cond / 信号量)
├─ msg_queue[50]: Message数组 (收件箱 / 消息队列)
└─ msg_queue_size: int (当前队列长度)
2.2 为什么必须要有锁和事件?
很多刚学完指针的同学写仿真,就是直接 node[1].state = COMMITTED,这叫上帝视角,根本不是分布式。
真正的网络中,Node 0 想给 Node 1 发消息,本质是 Node 0 的线程要把数据写入 Node 1 的 msg_queue。 想象一下,如果 Node 2 和 Node 3 同时向 Node 1 发消息,不加锁会怎样? 内存踩踏! 数据全乱了。
所以,原代码使用 CRITICAL_SECTION 是非常地道的底层思维。
三、 并发架构与事件驱动 (Event-Driven)
3.1 摒弃死循环,拥抱事件驱动
一般初学者写线程,容易写成这样:
// 菜鸟写法:CPU 占用率 100% 狂飙
while(1) {
if(node->msg_queue_size > 0) {
// 处理消息
}
}
来看看原AI代码是怎么写的(已简化注释):
while (WaitForSingleObject(consensus_complete_event, 50) == WAIT_TIMEOUT) {
// 线程在这里“睡觉”,直到有人调用 SetEvent 唤醒它,或者 50ms 超时
WaitForSingleObject(node->msg_queue_event, 50);
EnterCriticalSection(&node->msg_cs);
if (node->msg_queue_size > 0) {
// ... 取出消息处理 ...
逻辑图解:
[ 发送方线程 ] [ 接收方 Node 1 线程 ]
| |
1. 组装 Message | --> 正在休眠 (WaitForSingleObject)
2. 申请目标锁 (EnterCriticalSection) | (不消耗任何 CPU 资源)
3. 将消息塞入 Node 1 的 queue |
4. SetEvent(Node1_Event) --(叮咚!敲门)----->|
5. 释放锁 (LeaveCriticalSection) | --> 被内核唤醒!
|
6. 申请自己的锁 (EnterCriticalSection)
7. 取出队列消息
8. 进入 PBFT 状态机处理
这种设计叫 Reactor / 事件驱动模型,在嵌入式RTOS或高性能服务器(如Nginx、Redis)中极为常见。
四、 核心优化:手撕 O(N) 队列,换装 O(1) 环形缓冲区 (Ring Buffer)
原代码很棒,但在数据结构上有一个足以让底层程序员脑溢血的BUG。
4.1 痛点剖析:极其低效的数组平移
看原代码第 176-180 行:
Message msg = node->msg_queue[0]; // 取出第一条消息
// 灾难开始:将后面的所有消息往前挪动一格
for (int i = 0; i < node->msg_queue_size - 1; i++) {
node->msg_queue[i] = node->msg_queue[i + 1];
}
node->msg_queue_size--;
逻辑分析: 每次出一个元素,都要把后面的所有元素在内存中移动一次。如果队列里有1000个消息,每处理1个都要拷贝999次。算法复杂度是 $O(N)$。在网络高吞吐量下,CPU全在做无用的内存拷贝,直接卡死。
4.2 破局之道:环形队列 (Ring Buffer)
学过数据结构的都知道,解决这个问题的标准答案是环形队列。我们只需要维护两个指针(索引):head(出队位置)和 tail(入队位置)。取数据和存数据的时间复杂度都是绝对的 $O(1)$!
思维导图:环形缓冲区的精髓
空队列:head == tail
[ ][ ][ ][ ][ ]
^
head/tail
入队 3 个元素后:
[A][B][C][ ][ ]
^ ^
head tail
出队 2 个元素后:
[ ][ ][C][ ][ ]
^ ^
head tail
重点:到了数组尾部怎么办?利用取模运算 % 折返!
4.3 极限优化:替换 C 语言代码
让我们动手,把 Node 结构体和收发逻辑彻底爆改!
第一步:升级 Node 结构体
#define QUEUE_CAPACITY 50 // 队列容量
typedef struct {
int id;
NodeType type;
NodeState state;
int sequence;
int prepare_count[N_NODES];
int commit_count[N_NODES];
BOOL has_sent_prepare;
BOOL has_sent_commit;
// --- 锁和事件保持不变 ---
CRITICAL_SECTION msg_cs;
HANDLE msg_queue_event;
// --- 【升级部分:环形队列组件】 ---
Message msg_queue[QUEUE_CAPACITY];
int head; // 出队索引
int tail; // 入队索引
int count;// 当前队列中的消息总数
} Node;
第二步:重写节点初始化代码 (在 main 函数中)
// 原代码: nodes[i].msg_queue_size = 0;
// 优化为:
nodes[i].head = 0;
nodes[i].tail = 0;
nodes[i].count = 0;
第三步:重写发送消息函数 (入队逻辑)
void send_message_to_node(int sender, int target, MsgType type, int sequence) {
if (target < 0 || target >= N_NODES) return;
if (nodes[sender].type == BYZANTINE) {
if (rand() % 100 < 30) return; // 模拟拜占庭丢包
}
simulate_network_delay();
EnterCriticalSection(&nodes[target].msg_cs);
// 【优化核心:O(1) 环形入队】
if (nodes[target].count < QUEUE_CAPACITY) { // 检查队列是否已满
int tail = nodes[target].tail;
nodes[target].msg_queue[tail].sender = sender;
nodes[target].msg_queue[tail].target = target;
nodes[target].msg_queue[tail].type = type;
nodes[target].msg_queue[tail].sequence = sequence;
// 核心算法:移动 tail 指针,利用 % 实现绕环
nodes[target].tail = (tail + 1) % QUEUE_CAPACITY;
nodes[target].count++; // 总数加1
SetEvent(nodes[target].msg_queue_event); // 唤醒目标线程
EnterCriticalSection(&global_cs);
total_messages_sent++;
LeaveCriticalSection(&global_cs);
} else {
// 真实的系统在这里应该有日志警告:队列溢出丢包!
// printf("Warning: Node %d queue full!\n", target);
}
LeaveCriticalSection(&nodes[target].msg_cs);
}
第四步:重写线程接收循环 (出队逻辑)
// 在 NodeThread 内部的 while 循环中:
EnterCriticalSection(&node->msg_cs);
if (node->count > 0) { // 判断队列非空
// 【优化核心:O(1) 环形出队】
int head = node->head;
Message msg = node->msg_queue[head]; // 直接获取头元素
// 核心算法:移动 head 指针,再也不用移动整个数组了!
node->head = (head + 1) % QUEUE_CAPACITY;
node->count--; // 总数减1
// 如果队列空了,重置事件,让线程下次能继续休眠
if (node->count == 0) {
ResetEvent(node->msg_queue_event);
}
LeaveCriticalSection(&node->msg_cs);
// 根据取出的 msg 进入 PBFT 状态机...
switch (msg.type) { ... }
} else {
LeaveCriticalSection(&node->msg_cs);
}
4.4 优化对比总结表
|
维度 |
原生数组平移实现 |
环形队列 (Ring Buffer) 实现 |
胜出者 |
|---|---|---|---|
|
入队时间复杂度 |
$O(1)$ 直接末尾赋值 |
$O(1)$ 指针赋值并取模 |
平局 |
|
出队时间复杂度 |
$O(N)$ 极慢,需大面积内存拷贝 |
$O(1)$ 极快,仅移动一个索引 |
环形队列秒杀 |
|
CPU 资源消耗 |
高(伴随大量无意义指令执行) |
极低(几条汇编指令搞定) |
环形队列 |
|
工业应用场景 |
新手写的大作业代码 |
嵌入式串口通信、Linux内核网卡驱动、高并发服务器 |
环形队列 |
到目前为止,我们已经为这个 PBFT 仿真系统打造了一个坚不可摧、性能爆表的多线程底层引擎。
但是,分布式共识的真正难点在于协议本身的攻防。 目前原代码中的拜占庭节点仅仅是个只会“丢包(断网)”的铁憨憨,这叫崩溃容错(CFT),不叫拜占庭容错(BFT)!真正的拜占庭节点是会“两面三刀”、“伪造篡改数据”的。
PBFT 的协议层:
-
破解三阶段逻辑:Pre-prepare -> Prepare -> Commit 到底在防什么?
-
升级拜占庭节点:教你手写一段“恶意篡改序列号”的代码,看看 PBFT 是如何通过 $2f+1$ 的投票机制抵御这种攻击的!
-
打破上帝视角:移除全局锁,让每个节点真正实现去中心化的独立判断。
-------------------------------------------------------------------------------------------------------------------------------
在上篇中,我们把高大上的网络节点降维成了“带锁的环形队列+事件驱动的独立线程”。如果你对并发、锁、队列有了感觉,那么恭喜你,你已经具备了手撕任何分布式协议的地基。
但今天我们要进入深水区。面试官最喜欢挖坑的地方来了:“PBFT 为什么要分三个阶段?两个阶段不行吗?”、“如果主节点(Primary)作恶,给一半人发A,给一半人发B,你怎么防?”
如果你拿着原 AI 生成的那份代码去答,绝对会被扫地出门。原代码里的拜占庭节点只会 rand() % 100 < 30 丢丢包,这叫网络延迟/节点崩溃(CFT,如 Raft 算法处理的场景),根本不叫拜占庭故障(BFT,节点不仅会死,还会主动骗你)。
今天带你把这份代码改造成真·硬核拜占庭容错!
一、 PBFT 灵魂三阶段:为什么不能少?(总-分-总)
1.1 核心痛点:网络是不可靠的,节点是会骗人的 (总)
在讲代码之前,先看 PBFT 最经典的 $3f+1$ 和 Quorum(法定人数)数学铁律。 原代码定义:N_NODES = 7, F_TOLERANCE = 2, QUORUM = 5。 这个 5 是怎么来的?它是 $2f+1$!
-
总人数 $N = 3f+1 = 7$。
-
坏人最多 $f=2$ 个。
-
为了保证好人绝对占优,法定投票数必须是 $\lceil (N+f+1)/2 \rceil = 5$。这就是 Quorum。
1.2 逻辑拆解:三阶段状态机图谱 (分)
PBFT 像极了公司里的一次极其官僚的“群签”。我们用树形结构和 C 语言状态机来对应:
[ Primary 节点 (主节点 Node 0) 发起提案 ]
└── 1. Pre-prepare (预准备阶段) -> 对应状态机:PRE_PREPARED
├── 动作:老大发话:“今年年终奖发 10 个月,大家看行不行?”
└── 本质:确定 Sequence (序列号) 和提案内容的绑定关系。
[ Replica 节点 (副本节点 Node 1~6) 收到提案 ]
└── 2. Prepare (准备阶段) -> 对应状态机:PREPARED (核心校验:搜集 Quorum 张票)
├── 动作:老二对所有人群发:“我同意老大说的发 10 个月。”
│ 老三对所有人群发:“我同意老大说的发 10 个月。”
└── 本质:确保所有【诚实节点】在当前的 View (视图) 下,对这个 Sequence 达成了一致。
❓ 灵魂拷问:到这步共识不就达成了吗?为什么还要第三步?
❗️ 致命缺陷:如果此时老大挂了,要换届(View Change),新老大不知道你们刚才同意了啥,系统会分叉!
[ 全网节点 (Node 0~6) 确认 Prepare ]
└── 3. Commit (提交阶段) -> 对应状态机:COMMITTED (终极校验:再次搜集 Quorum 张票)
├── 动作:老二再次群发:“我看到至少5个人(包括我)同意了,我准备写入硬盘了!”
└── 本质:这是为了【跨视图(跨届)的安全】。只要一个提案走到了 Commit,哪怕网络瞬间断网、领导层大换血,这个决定也永远无法被推翻。
1.3 状态与条件映射表 (总)
在 C 代码的 handle_xxx 函数中,其实就是在做下面这张表的逻辑判断:
|
阶段函数 |
触发条件 (收到什么消息) |
内部判断逻辑 (转移条件) |
状态变更 (State) |
动作 (向外发什么) |
|---|---|---|---|---|
|
|
|
消息序列号合法且未处理过 |
|
广播 |
|
|
|
|
|
广播 |
|
|
|
|
|
写入本地账本 (成功) |
二、 刺透代码:手写“两面三刀”的拜占庭攻击
原代码的 if (rand() % 100 < 30) return; 简直是过家家。 什么是真正的拜占庭?伪造身份、重播攻击、篡改数据、两面三刀 (Equivocation)。
我们直接在 C 语言的 send_message_to_node 函数里注入灵魂,让拜占庭节点在发 PREPARE 票时,故意给一半的节点发合法的序列号,给另一半节点发一个恶意的、篡改过的序列号!
2.1 硬核代码:两面派攻击 (Equivocation Attack) 实现
// 升级版:带“两面三刀”攻击的发送函数
void send_message_to_node(int sender, int target, MsgType type, int sequence) {
if (target < 0 || target >= N_NODES) return;
int send_sequence = sequence; // 默认发送诚实的序列号
// 【硬核注入:真·拜占庭逻辑】
if (nodes[sender].type == BYZANTINE) {
// 攻击模式 1:随机装死 (保留原有的崩溃容错测试)
if (rand() % 100 < 10) return;
// 攻击模式 2:两面三刀 (Equivocation)
// 故意向编号为偶数的节点发送合法的 sequence,
// 向编号为奇数的节点发送一个篡改的恶意 sequence (例如 9999)
// 目的:试图撕裂网络,让一半人同意 seq 1,一半人同意 seq 9999
if (type == PREPARE || type == COMMIT) {
if (target % 2 != 0) {
send_sequence = 9999;
// printf("[攻击] 拜占庭节点 %d 对 节点 %d 投毒!Seq修改为9999\n", sender, target);
}
}
}
simulate_network_delay();
EnterCriticalSection(&nodes[target].msg_cs);
if (nodes[target].count < QUEUE_CAPACITY) {
int tail = nodes[target].tail;
nodes[target].msg_queue[tail].sender = sender;
nodes[target].msg_queue[tail].target = target;
nodes[target].msg_queue[tail].type = type;
// 发送被拜占庭节点可能篡改过的 sequence!
nodes[target].msg_queue[tail].sequence = send_sequence;
nodes[target].tail = (tail + 1) % QUEUE_CAPACITY;
nodes[target].count++;
SetEvent(nodes[target].msg_queue_event);
}
LeaveCriticalSection(&nodes[target].msg_cs);
}
2.2 为什么 PBFT 不怕这种攻击?
如果你跑了上面这段投毒代码,你会发现:好人们依然能够顺利达成共识(到达 COMMITTED 状态)! 这就是 PBFT 的牛逼之处,也是为什么 Quorum 必须是 $2f+1$ 的底层物理学:
-
撕裂失败:拜占庭节点给奇数节点发
9999,奇数节点收到后,发现msg->sequence != node->sequence(原代码中的校验逻辑),直接丢弃。 -
票数压制:即便拜占庭节点把假票投进了计票器,但诚实节点有 $N - f = 5$ 个。这 5 个人互相投票,刚好能凑齐 $QUORUM = 5$ 票!坏蛋那 2 张废票,根本不足以改变大局。
三、 砸碎“上帝视角”:实现真·去中心化架构
原 AI 代码里最让人吐槽的,是这段“上帝视角”的代码:
// 原代码中的极度不合理之处
EnterCriticalSection(&global_cs);
if (!consensus_reached) {
consensus_reached = TRUE; // 上帝说:哦,共识达成了!
SetEvent(consensus_complete_event); // 上帝敲锣打鼓结束所有线程
}
LeaveCriticalSection(&global_cs);
底层老兵的咆哮: 在分布式环境里,根本不存在一个可以包揽全局的 global_cs (全局锁)!每个节点就是一台孤零零放在机房里的服务器,它能依靠的只有自己的内存状态!
我们要删掉全局锁,删掉全局事件,让每个节点“独立宣布胜利”。
3.1 去中心化的 Commit 处理逻辑重构
每个节点只在自己的 handle_commit 里,通过判断自己的 commit_count 数组,自己决定是否上链。
// 重构:原汁原味的去中心化节点逻辑
void handle_commit(Node *node, Message *msg) {
// 拜占庭节点瞎胡闹,它不按套路出牌
if (node->type == BYZANTINE) return;
// 核心安全校验:只收当前处理序列号的票
if (msg->sequence == node->sequence) {
// 防止重放攻击:同一个人只能投一票!原代码用 1 赋值而非 ++ 极其精妙
node->commit_count[msg->sender] = 1;
int commit_total = 0;
for (int i = 0; i < N_NODES; i++) {
commit_total += node->commit_count[i];
}
// 去中心化的胜利宣告!
// 只要满足法定人数 (QUORUM),并且处于 PREPARED 状态
if (commit_total >= QUORUM && node->state == PREPARED) {
node->state = COMMITTED;
// 没有什么全局锁,没有全局事件!
// 只有自己机房里这台机器默默地写盘、落库
printf("[节点 %02d 独立宣告] 收集到 %d 张 Commit 票,Sequence %d 正式写入本地账本!\n",
node->id, commit_total, node->sequence);
// 真实系统中,这里会触发持久化机制 (写入 LevelDB/RocksDB)
// write_to_disk(node->sequence, "Transaction Data");
}
}
}
3.2 Main 函数主循环的涅槃重构
既然没有了上帝视角的 consensus_complete_event,我们的主线程 main 怎么知道仿真结束了? 答案是:等待超时 (Timeout),或者在外部做一个监控线程不断轮询各自的状态(类似 Prometheus 拉取监控指标)。
我们将原主循环改为更硬核的线程汇合 (Join) 与状态探针机制:
int main() {
// ... 前置初始化与创建线程 (同原代码,但移除了 global_cs) ...
printf("--- 硬核去中心化 PBFT 仿真启动 ---\n");
printf("主节点 Node 0 即将发起提案...\n");
// 给系统 3 秒钟的时间去网状通信、达成共识
// 真实场景下,节点是永不退出的死循环。
Sleep(3000);
// --- 监控探针阶段 (取代上帝视角的暴力判断) ---
printf("\n=== 3秒通信时间到,主线程作为外部监控开始探针审计 ===\n");
printf("\n%-8s | %-10s | %-12s | %-10s | %-10s\n",
"节点ID", "身份", "最终状态", "Prepare票", "Commit票");
printf("------------------------------------------------------------\n");
int honest_committed_count = 0;
for (int i = 0; i < N_NODES; i++) {
int prep_total = 0, comm_total = 0;
for (int j = 0; j < N_NODES; j++) {
prep_total += nodes[i].prepare_count[j];
comm_total += nodes[i].commit_count[j];
}
char *node_type = (nodes[i].type == BYZANTINE) ? "★拜占庭" : "诚实节点";
char *state_str = "未达共识";
if (nodes[i].state == COMMITTED) {
state_str = "已写入账本";
if (nodes[i].type == HONEST) honest_committed_count++;
}
printf("Node %02d | %-10s | %-12s | %-10d | %-10d\n",
i, node_type, state_str, prep_total, comm_total);
}
printf("------------------------------------------------------------\n");
// 最终业务层研判:只要大多数诚实节点(至少 f+1)提交了,系统就是安全的
if (honest_committed_count >= (F_TOLERANCE + 1)) {
printf(">> 结论:系统在遭受拜占庭攻击下,【成功】保住了共识一致性!\n");
} else {
printf(">> 结论:系统被攻破或发生分叉!\n");
}
// 强杀模拟线程,清理资源
// ...
return 0;
}
四、 总结与升华:从 C 语言到工业级架构
经过上、下两篇的彻底爆改,这份原本勉强能跑的 AI 代码,已经被我们改造出了工业级底层的雏形:
核心重构对照表
|
维度 |
AI 初始版本 (大作业水平) |
咱们的爆改版本 (资深开发水平) |
|---|---|---|
|
内存与性能 |
O(N) 数组平移,CPU浪费严重 |
O(1) 环形缓冲区,极限压榨CPU性能 |
|
并发模型 |
带有上帝视角的全局锁,伪分布式 |
全异步事件驱动 Reactor模型,彻底去中心化 |
|
拜占庭逻辑 |
仅模拟断网丢包 (CFT) |
注入两面三刀 (Equivocation) 攻击逻辑,真BFT压测 |
|
防重放攻击 |
代码已有体现 ( |
深入讲解了为何不用 |
最后,致各位正在啃底层、刷算法的兄弟们: 很多人觉得区块链全是发币骗人的,但剥去金融的外衣,像 PBFT、Raft、Paxos 这样的分布式共识算法,是目前计算机科学领域最耀眼的明珠之一。
当你用 C 语言的指针、锁、状态机亲手把它们拼装起来,看着线程在并发的混乱中,通过精妙的投票数学($3f+1$)最终走向秩序和统一时,那种作为系统架构师的掌控感和爽感,是写多少个增删改查业务都换不来的!
这篇万字拆解呕心沥血,图表、代码全手打。如果你能看到这里,说明你绝对是个追求极致的技术极客!!!
附录,300行源码:
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>
#define N_NODES 7 // Reduced to 7 nodes for better testing (f=2, quorum=5)
#define F_TOLERANCE 2
#define QUORUM 5
#define MIN_LATENCY 10 // Reduced delay for faster execution
#define MAX_LATENCY 50
#define BYZANTINE_NODES 1 // Only 1 Byzantine node
// Message types
typedef enum
{
PRE_PREPARE,
PREPARE,
COMMIT
} MsgType;
// Node types
typedef enum
{
HONEST,
BYZANTINE
} NodeType;
// Message structure
typedef struct
{
int sender;
int target;
MsgType type;
int sequence;
} Message;
// Node states
typedef enum
{
IDLE,
PRE_PREPARED,
PREPARED,
COMMITTED
} NodeState;
// Node structure
typedef struct
{
int id;
NodeType type;
NodeState state;
int sequence;
int prepare_count[N_NODES];
int commit_count[N_NODES];
BOOL has_sent_prepare;
BOOL has_sent_commit;
CRITICAL_SECTION msg_cs;
HANDLE msg_queue_event;
Message msg_queue[50];
int msg_queue_size;
} Node;
// Global variables
Node nodes[N_NODES];
CRITICAL_SECTION global_cs;
HANDLE consensus_complete_event;
BOOL consensus_reached = FALSE;
int total_messages_sent = 0;
// Simulate network delay
void simulate_network_delay()
{
int delay = MIN_LATENCY + rand() % (MAX_LATENCY - MIN_LATENCY);
if ((rand() % 100) < 10)
{ // 10% chance of extra delay
delay += 100;
}
Sleep(delay);
}
// Send message to specific node
void send_message_to_node(int sender, int target, MsgType type, int sequence)
{
if (target < 0 || target >= N_NODES)
return;
// Byzantine nodes may drop or corrupt messages
if (nodes[sender].type == BYZANTINE)
{
if (rand() % 100 < 30)
{ // 30% chance to drop message
return;
}
}
simulate_network_delay();
EnterCriticalSection(&nodes[target].msg_cs);
if (nodes[target].msg_queue_size < 50)
{
nodes[target].msg_queue[nodes[target].msg_queue_size].sender = sender;
nodes[target].msg_queue[nodes[target].msg_queue_size].target = target;
nodes[target].msg_queue[nodes[target].msg_queue_size].type = type;
nodes[target].msg_queue[nodes[target].msg_queue_size].sequence = sequence;
nodes[target].msg_queue_size++;
SetEvent(nodes[target].msg_queue_event);
EnterCriticalSection(&global_cs);
total_messages_sent++;
LeaveCriticalSection(&global_cs);
}
LeaveCriticalSection(&nodes[target].msg_cs);
}
// Broadcast message to all nodes
void broadcast_message(int sender, MsgType type, int sequence)
{
for (int i = 0; i < N_NODES; i++)
{
if (i != sender)
{
send_message_to_node(sender, i, type, sequence);
}
}
}
// Handle Pre-prepare message
void handle_pre_prepare(Node *node, Message *msg)
{
if (node->type == BYZANTINE)
{
if (rand() % 100 < 40)
{
return;
}
}
if (msg->sequence == node->sequence)
{
node->state = PRE_PREPARED;
if (!node->has_sent_prepare)
{
node->has_sent_prepare = TRUE;
broadcast_message(node->id, PREPARE, msg->sequence);
}
}
}
// Handle Prepare message
void handle_prepare(Node *node, Message *msg)
{
if (node->type == BYZANTINE)
{
if (rand() % 100 < 20)
{
return;
}
}
if (msg->sequence == node->sequence)
{
node->prepare_count[msg->sender] = 1;
int prepare_total = 0;
for (int i = 0; i < N_NODES; i++)
{
prepare_total += node->prepare_count[i];
}
if (prepare_total >= QUORUM && node->state == PRE_PREPARED)
{
node->state = PREPARED;
if (!node->has_sent_commit)
{
node->has_sent_commit = TRUE;
broadcast_message(node->id, COMMIT, msg->sequence);
}
}
}
}
// Handle Commit message
void handle_commit(Node *node, Message *msg)
{
if (node->type == BYZANTINE)
{
if (rand() % 100 < 10)
{
return;
}
}
if (msg->sequence == node->sequence)
{
node->commit_count[msg->sender] = 1;
int commit_total = 0;
for (int i = 0; i < N_NODES; i++)
{
commit_total += node->commit_count[i];
}
if (commit_total >= QUORUM && node->state == PREPARED)
{
node->state = COMMITTED;
printf("[Node %02d] Consensus reached! Sequence: %d\n", node->id, node->sequence);
EnterCriticalSection(&global_cs);
if (!consensus_reached)
{
consensus_reached = TRUE;
SetEvent(consensus_complete_event);
}
LeaveCriticalSection(&global_cs);
}
}
}
// Node main thread
DWORD WINAPI NodeThread(LPVOID lpParam)
{
Node *node = (Node *)lpParam;
// Primary node (node 0) initiates the request
if (node->id == 0 && node->type == HONEST)
{
Sleep(50); // Wait for other nodes to start
printf("Primary Node 00 initiating proposal (Pre-prepare phase)...\n\n");
broadcast_message(0, PRE_PREPARE, node->sequence);
}
// Message processing loop
while (WaitForSingleObject(consensus_complete_event, 50) == WAIT_TIMEOUT)
{
// self注释:外层循环作用:循环等待consensus_complete_event,如果consensus_complete_event没有被触发,则进入内层循环,处理消息。一旦事件被触发,则结束外层循环,进入内层循环。
WaitForSingleObject(node->msg_queue_event, 50);
// 内层的循环:循环等待节点自己的消息msg_queue,如果msg_queue有消息,则进入内层循环,处理消息。
EnterCriticalSection(&node->msg_cs);
if (node->msg_queue_size > 0)
{
Message msg = node->msg_queue[0];
for (int i = 0; i < node->msg_queue_size - 1; i++)
{
node->msg_queue[i] = node->msg_queue[i + 1];
}
node->msg_queue_size--;
ResetEvent(node->msg_queue_event);
LeaveCriticalSection(&node->msg_cs);
switch (msg.type)
{
case PRE_PREPARE:
handle_pre_prepare(node, &msg);
break;
case PREPARE:
handle_prepare(node, &msg);
break;
case COMMIT:
handle_commit(node, &msg);
break;
default:
break;
}
}
else
{
LeaveCriticalSection(&node->msg_cs);
}
}
return 0;
}
int main()
{
srand(time(NULL));
InitializeCriticalSection(&global_cs);
consensus_complete_event = CreateEvent(NULL, TRUE, FALSE, NULL);
printf("--- Optimized PBFT Simulation Started ---\n");
printf("Nodes: %d, Fault Tolerance: %d, Byzantine Nodes: %d\n",
N_NODES, F_TOLERANCE, BYZANTINE_NODES);
// Initialize all nodes
for (int i = 0; i < N_NODES; i++)
{
nodes[i].id = i;
// Make sure primary node (node 0) is always honest
nodes[i].type = (i == 0) ? HONEST : (i <= BYZANTINE_NODES) ? BYZANTINE
: HONEST;
nodes[i].state = IDLE;
nodes[i].sequence = 1;
nodes[i].has_sent_prepare = FALSE;
nodes[i].has_sent_commit = FALSE;
nodes[i].msg_queue_size = 0;
InitializeCriticalSection(&nodes[i].msg_cs);
nodes[i].msg_queue_event = CreateEvent(NULL, FALSE, FALSE, NULL);
memset(nodes[i].prepare_count, 0, sizeof(nodes[i].prepare_count));
memset(nodes[i].commit_count, 0, sizeof(nodes[i].commit_count));
}
// Create threads for all nodes
HANDLE threads[N_NODES];
for (int i = 0; i < N_NODES; i++)
{
threads[i] = CreateThread(NULL, 0, NodeThread, &nodes[i], 0, NULL);
}
// Wait for consensus or timeout (5 seconds)
if (WaitForSingleObject(consensus_complete_event, 5000) == WAIT_OBJECT_0)
{
printf("\n=== CONSENSUS SUCCESSFULLY REACHED! ===\n");
}
else
{
printf("\n=== CONSENSUS TIMEOUT - FAILED TO REACH AGREEMENT ===\n");
}
// Wait for all threads to finish
WaitForMultipleObjects(N_NODES, threads, TRUE, 1000);
// Print statistics
printf("\n%-8s | %-10s | %-12s | %-10s | %-10s\n", "NodeID", "Type", "State", "PrepCount", "CommCount");
printf("------------------------------------------------------------\n");
for (int i = 0; i < N_NODES; i++)
{
int prep_total = 0, comm_total = 0;
for (int j = 0; j < N_NODES; j++)
{
prep_total += nodes[i].prepare_count[j];
comm_total += nodes[i].commit_count[j];
}
char *node_type = (nodes[i].type == BYZANTINE) ? "Byzantine" : "Honest";
char *state_str;
switch (nodes[i].state)
{
case IDLE:
state_str = "Idle";
break;
case PRE_PREPARED:
state_str = "PrePrep";
break;
case PREPARED:
state_str = "Prepared";
break;
case COMMITTED:
state_str = "Committed";
break;
default:
state_str = "Unknown";
break;
}
printf("Node %02d | %-10s | %-12s | %-10d | %-10d\n",
i, node_type, state_str, prep_total, comm_total);
}
printf("------------------------------------------------------------\n");
printf("Total Network Messages: %d\n", total_messages_sent);
// Cleanup
CloseHandle(consensus_complete_event);
for (int i = 0; i < N_NODES; i++)
{
DeleteCriticalSection(&nodes[i].msg_cs);
CloseHandle(nodes[i].msg_queue_event);
}
DeleteCriticalSection(&global_cs);
return 0;
}
更多推荐



所有评论(0)