---------------------------------------------------------------------------------------------------------------------------                                                                                                                      更新于2026.3.30

 

如果一听到“区块链”、“PBFT共识算法”、“拜占庭容错”这些词就觉得高大上、云里雾里——那这篇文章就是为你量身定制的。

别被高大上的名词唬住了。抛开炒币的噱头,分布式共识系统,在底层C程序员眼里,就是一个多线程(多进程)通过网络互相发结构体,用状态机维护自身状态,并用锁和事件机制解决并发冲突的系统。

今天,咱们就拿一份通义灵码写的PBFT仿真代码开刀。这份代码作为原型非常棒,跨越了单线程递归的玩具级别,上了真·多线程并发。但站在工业级底层开发的角度,它还有硬伤。

今天,我们就来一场“代码级解剖”,不仅要看懂,还要把它改得更牛逼。看完这篇,面试官问你分布式,你可以直接拿底层的并发和队列去碾压他。如果觉得干货够硬,别忘了点赞、收藏、关注一波,老哥的心血不能白费!

一、 总览:PBFT到底在干嘛?(总-分-总解析)

1.1 核心思想 (总)

一句话概括:在一群节点(机器)里,有几台可能宕机,甚至可能恶意发假消息(拜占庭节点)。PBFT算法能保证只要好人的数量足够多,整个系统就能达成一致的决定。

数学铁律:N = 3f + 1

  • N:总节点数

  • f:能容忍的最大坏蛋/故障节点数。

  • 为什么是 3f+1?因为坏人有 f 个,为了压制坏人,好人必须比坏人多,且在网络分区时也能达成多数派。

1.2 角色与状态映射 (分)

在C语言的视角下,我们把高大上的概念翻译成代码实体:

区块链/分布式名词

C语言/底层系统映射

原代码实现

Node (节点)

一个独立的线程 (Thread)

DWORD WINAPI NodeThread

P2P网络广播

内存拷贝 + 锁并发控制

EnterCriticalSection + 数组赋值

网络延迟/丢包

随机数生成器 + 线程休眠

Sleep() + rand()

共识状态机

enum 变量 + switch-case

NodeState state

事件通知

操作系统事件对象/信号量

Windows API CreateEvent

1.3 小结:多线程单机仿真 (总)

我们的仿真环境,就是用一台电脑上的多个线程,模拟分布在世界各地的多台服务器。线程间通过读写彼此的“消息队列(数组)”来模拟网络通信。既然是多线程读写共享变量,那**临界区(锁)**就绝对跑不掉。

二、 数据结构扒皮:一个节点的五脏六腑

我们先来看原代码中最核心的数据结构 Node。看懂了它,你就懂了70%。

2.1 Node结构体 ER模型分析

一个完整的PBFT节点,不仅要有身份证(ID),还要有账本(记票器)、状态机、以及最重要的——收件箱(带锁的队列)。

[ Node (节点实体) ]
 ├─ 基础属性
 │   ├─ id: int (节点编号 0~6)
 │   └─ type: NodeType (好人 HONEST / 坏人 BYZANTINE)
 │
 ├─ PBFT核心状态机
 │   ├─ state: NodeState (当前状态:IDLE -> PRE_PREPARED -> PREPARED -> COMMITTED)
 │   └─ sequence: int (当前正在处理的提案序列号)
 │
 ├─ 记票簿 (防双花、防作伪)
 │   ├─ prepare_count[N_NODES]: int数组 (记录收到了谁的Prepare票)
 │   ├─ commit_count[N_NODES]: int数组 (记录收到了谁的Commit票)
 │   ├─ has_sent_prepare: BOOL (防重发标记位)
 │   └─ has_sent_commit: BOOL (防重发标记位)
 │
 └─ 线程通信组件 (底层硬核部分)
     ├─ msg_cs: CRITICAL_SECTION (Windows临界区,相当于 pthread_mutex)
     ├─ msg_queue_event: HANDLE (Windows事件,相当于 pthread_cond / 信号量)
     ├─ msg_queue[50]: Message数组 (收件箱 / 消息队列)
     └─ msg_queue_size: int (当前队列长度)

2.2 为什么必须要有锁和事件?

很多刚学完指针的同学写仿真,就是直接 node[1].state = COMMITTED,这叫上帝视角,根本不是分布式。

真正的网络中,Node 0 想给 Node 1 发消息,本质是 Node 0 的线程要把数据写入 Node 1 的 msg_queue。 想象一下,如果 Node 2 和 Node 3 同时向 Node 1 发消息,不加锁会怎样? 内存踩踏! 数据全乱了。

所以,原代码使用 CRITICAL_SECTION 是非常地道的底层思维。

三、 并发架构与事件驱动 (Event-Driven)

3.1 摒弃死循环,拥抱事件驱动

一般初学者写线程,容易写成这样:

// 菜鸟写法:CPU 占用率 100% 狂飙
while(1) {
    if(node->msg_queue_size > 0) {
        // 处理消息
    }
}

来看看原AI代码是怎么写的(已简化注释):

while (WaitForSingleObject(consensus_complete_event, 50) == WAIT_TIMEOUT) {
    // 线程在这里“睡觉”,直到有人调用 SetEvent 唤醒它,或者 50ms 超时
    WaitForSingleObject(node->msg_queue_event, 50); 
    
    EnterCriticalSection(&node->msg_cs);
    if (node->msg_queue_size > 0) {
        // ... 取出消息处理 ...

逻辑图解:

[ 发送方线程 ]                         [ 接收方 Node 1 线程 ]
      |                                      |
 1. 组装 Message                              | --> 正在休眠 (WaitForSingleObject)
 2. 申请目标锁 (EnterCriticalSection)          |      (不消耗任何 CPU 资源)
 3. 将消息塞入 Node 1 的 queue                |
 4. SetEvent(Node1_Event) --(叮咚!敲门)----->|
 5. 释放锁 (LeaveCriticalSection)             | --> 被内核唤醒!
                                             |
                                        6. 申请自己的锁 (EnterCriticalSection)
                                        7. 取出队列消息
                                        8. 进入 PBFT 状态机处理

这种设计叫 Reactor / 事件驱动模型,在嵌入式RTOS或高性能服务器(如Nginx、Redis)中极为常见。

四、 核心优化:手撕 O(N) 队列,换装 O(1) 环形缓冲区 (Ring Buffer)

原代码很棒,但在数据结构上有一个足以让底层程序员脑溢血的BUG

4.1 痛点剖析:极其低效的数组平移

看原代码第 176-180 行:

Message msg = node->msg_queue[0]; // 取出第一条消息
// 灾难开始:将后面的所有消息往前挪动一格
for (int i = 0; i < node->msg_queue_size - 1; i++) {
    node->msg_queue[i] = node->msg_queue[i + 1];
}
node->msg_queue_size--;

逻辑分析: 每次出一个元素,都要把后面的所有元素在内存中移动一次。如果队列里有1000个消息,每处理1个都要拷贝999次。算法复杂度是 $O(N)$。在网络高吞吐量下,CPU全在做无用的内存拷贝,直接卡死。

4.2 破局之道:环形队列 (Ring Buffer)

学过数据结构的都知道,解决这个问题的标准答案是环形队列。我们只需要维护两个指针(索引):head(出队位置)和 tail(入队位置)。取数据和存数据的时间复杂度都是绝对的 $O(1)$!

思维导图:环形缓冲区的精髓

空队列:head == tail
       [ ][ ][ ][ ][ ]
        ^
      head/tail

入队 3 个元素后:
       [A][B][C][ ][ ]
        ^        ^
       head     tail

出队 2 个元素后:
       [ ][ ][C][ ][ ]
              ^  ^
           head tail

重点:到了数组尾部怎么办?利用取模运算 % 折返!

4.3 极限优化:替换 C 语言代码

让我们动手,把 Node 结构体和收发逻辑彻底爆改!

第一步:升级 Node 结构体

#define QUEUE_CAPACITY 50 // 队列容量

typedef struct {
    int id;
    NodeType type;
    NodeState state;
    int sequence;
    int prepare_count[N_NODES];
    int commit_count[N_NODES];
    BOOL has_sent_prepare;
    BOOL has_sent_commit;
    
    // --- 锁和事件保持不变 ---
    CRITICAL_SECTION msg_cs;
    HANDLE msg_queue_event;
    
    // --- 【升级部分:环形队列组件】 ---
    Message msg_queue[QUEUE_CAPACITY];
    int head; // 出队索引
    int tail; // 入队索引
    int count;// 当前队列中的消息总数
} Node;

第二步:重写节点初始化代码 (在 main 函数中)

// 原代码: nodes[i].msg_queue_size = 0;
// 优化为:
nodes[i].head = 0;
nodes[i].tail = 0;
nodes[i].count = 0;

第三步:重写发送消息函数 (入队逻辑)

void send_message_to_node(int sender, int target, MsgType type, int sequence) {
    if (target < 0 || target >= N_NODES) return;

    if (nodes[sender].type == BYZANTINE) {
        if (rand() % 100 < 30) return; // 模拟拜占庭丢包
    }

    simulate_network_delay();

    EnterCriticalSection(&nodes[target].msg_cs);
    // 【优化核心:O(1) 环形入队】
    if (nodes[target].count < QUEUE_CAPACITY) { // 检查队列是否已满
        int tail = nodes[target].tail;
        nodes[target].msg_queue[tail].sender = sender;
        nodes[target].msg_queue[tail].target = target;
        nodes[target].msg_queue[tail].type = type;
        nodes[target].msg_queue[tail].sequence = sequence;
        
        // 核心算法:移动 tail 指针,利用 % 实现绕环
        nodes[target].tail = (tail + 1) % QUEUE_CAPACITY;
        nodes[target].count++; // 总数加1
        
        SetEvent(nodes[target].msg_queue_event); // 唤醒目标线程

        EnterCriticalSection(&global_cs);
        total_messages_sent++;
        LeaveCriticalSection(&global_cs);
    } else {
        // 真实的系统在这里应该有日志警告:队列溢出丢包!
        // printf("Warning: Node %d queue full!\n", target);
    }
    LeaveCriticalSection(&nodes[target].msg_cs);
}

第四步:重写线程接收循环 (出队逻辑)

// 在 NodeThread 内部的 while 循环中:
EnterCriticalSection(&node->msg_cs);
if (node->count > 0) { // 判断队列非空
    // 【优化核心:O(1) 环形出队】
    int head = node->head;
    Message msg = node->msg_queue[head]; // 直接获取头元素
    
    // 核心算法:移动 head 指针,再也不用移动整个数组了!
    node->head = (head + 1) % QUEUE_CAPACITY;
    node->count--; // 总数减1
    
    // 如果队列空了,重置事件,让线程下次能继续休眠
    if (node->count == 0) {
        ResetEvent(node->msg_queue_event);
    }
    LeaveCriticalSection(&node->msg_cs);

    // 根据取出的 msg 进入 PBFT 状态机...
    switch (msg.type) { ... }
} else {
    LeaveCriticalSection(&node->msg_cs);
}

4.4 优化对比总结表

维度

原生数组平移实现

环形队列 (Ring Buffer) 实现

胜出者

入队时间复杂度

$O(1)$ 直接末尾赋值

$O(1)$ 指针赋值并取模

平局

出队时间复杂度

$O(N)$ 极慢,需大面积内存拷贝

$O(1)$ 极快,仅移动一个索引

环形队列秒杀

CPU 资源消耗

高(伴随大量无意义指令执行)

极低(几条汇编指令搞定)

环形队列

工业应用场景

新手写的大作业代码

嵌入式串口通信、Linux内核网卡驱动、高并发服务器

环形队列

到目前为止,我们已经为这个 PBFT 仿真系统打造了一个坚不可摧、性能爆表的多线程底层引擎

但是,分布式共识的真正难点在于协议本身的攻防。 目前原代码中的拜占庭节点仅仅是个只会“丢包(断网)”的铁憨憨,这叫崩溃容错(CFT),不叫拜占庭容错(BFT)!真正的拜占庭节点是会“两面三刀”、“伪造篡改数据”的。

 PBFT 的协议层:

  1. 破解三阶段逻辑:Pre-prepare -> Prepare -> Commit 到底在防什么?

  2. 升级拜占庭节点:教你手写一段“恶意篡改序列号”的代码,看看 PBFT 是如何通过 $2f+1$ 的投票机制抵御这种攻击的!

  3. 打破上帝视角:移除全局锁,让每个节点真正实现去中心化的独立判断。

-------------------------------------------------------------------------------------------------------------------------------

在上篇中,我们把高大上的网络节点降维成了“带锁的环形队列+事件驱动的独立线程”。如果你对并发、锁、队列有了感觉,那么恭喜你,你已经具备了手撕任何分布式协议的地基。

但今天我们要进入深水区。面试官最喜欢挖坑的地方来了:“PBFT 为什么要分三个阶段?两个阶段不行吗?”、“如果主节点(Primary)作恶,给一半人发A,给一半人发B,你怎么防?

如果你拿着原 AI 生成的那份代码去答,绝对会被扫地出门。原代码里的拜占庭节点只会 rand() % 100 < 30 丢丢包,这叫网络延迟/节点崩溃(CFT,如 Raft 算法处理的场景),根本不叫拜占庭故障(BFT,节点不仅会死,还会主动骗你)。

今天带你把这份代码改造成真·硬核拜占庭容错

一、 PBFT 灵魂三阶段:为什么不能少?(总-分-总)

1.1 核心痛点:网络是不可靠的,节点是会骗人的 (总)

在讲代码之前,先看 PBFT 最经典的 $3f+1$ 和 Quorum(法定人数)数学铁律。 原代码定义:N_NODES = 7, F_TOLERANCE = 2, QUORUM = 5。 这个 5 是怎么来的?它是 $2f+1$

  • 总人数 $N = 3f+1 = 7$。

  • 坏人最多 $f=2$ 个。

  • 为了保证好人绝对占优,法定投票数必须是 $\lceil (N+f+1)/2 \rceil = 5$。这就是 Quorum。

1.2 逻辑拆解:三阶段状态机图谱 (分)

PBFT 像极了公司里的一次极其官僚的“群签”。我们用树形结构和 C 语言状态机来对应:

[ Primary 节点 (主节点 Node 0) 发起提案 ]
 └── 1. Pre-prepare (预准备阶段) -> 对应状态机:PRE_PREPARED
     ├── 动作:老大发话:“今年年终奖发 10 个月,大家看行不行?”
     └── 本质:确定 Sequence (序列号) 和提案内容的绑定关系。

[ Replica 节点 (副本节点 Node 1~6) 收到提案 ]
 └── 2. Prepare (准备阶段) -> 对应状态机:PREPARED (核心校验:搜集 Quorum 张票)
     ├── 动作:老二对所有人群发:“我同意老大说的发 10 个月。”
     │          老三对所有人群发:“我同意老大说的发 10 个月。”
     └── 本质:确保所有【诚实节点】在当前的 View (视图) 下,对这个 Sequence 达成了一致。
         ❓ 灵魂拷问:到这步共识不就达成了吗?为什么还要第三步?
         ❗️ 致命缺陷:如果此时老大挂了,要换届(View Change),新老大不知道你们刚才同意了啥,系统会分叉!

[ 全网节点 (Node 0~6) 确认 Prepare ]
 └── 3. Commit (提交阶段) -> 对应状态机:COMMITTED (终极校验:再次搜集 Quorum 张票)
     ├── 动作:老二再次群发:“我看到至少5个人(包括我)同意了,我准备写入硬盘了!”
     └── 本质:这是为了【跨视图(跨届)的安全】。只要一个提案走到了 Commit,哪怕网络瞬间断网、领导层大换血,这个决定也永远无法被推翻。

1.3 状态与条件映射表 (总)

在 C 代码的 handle_xxx 函数中,其实就是在做下面这张表的逻辑判断:

阶段函数

触发条件 (收到什么消息)

内部判断逻辑 (转移条件)

状态变更 (State)

动作 (向外发什么)

handle_pre_prepare

PRE_PREPARE 消息

消息序列号合法且未处理过

IDLE -> PRE_PREPARED

广播 PREPARE

handle_prepare

PREPARE 消息

prepare_count $\ge$ QUORUM

PRE_PREPARED -> PREPARED

广播 COMMIT

handle_commit

COMMIT 消息

commit_count $\ge$ QUORUM

PREPARED -> COMMITTED

写入本地账本 (成功)

二、 刺透代码:手写“两面三刀”的拜占庭攻击

原代码的 if (rand() % 100 < 30) return; 简直是过家家。 什么是真正的拜占庭?伪造身份、重播攻击、篡改数据、两面三刀 (Equivocation)

我们直接在 C 语言的 send_message_to_node 函数里注入灵魂,让拜占庭节点在发 PREPARE 票时,故意给一半的节点发合法的序列号,给另一半节点发一个恶意的、篡改过的序列号

2.1 硬核代码:两面派攻击 (Equivocation Attack) 实现

// 升级版:带“两面三刀”攻击的发送函数
void send_message_to_node(int sender, int target, MsgType type, int sequence) {
    if (target < 0 || target >= N_NODES) return;

    int send_sequence = sequence; // 默认发送诚实的序列号

    // 【硬核注入:真·拜占庭逻辑】
    if (nodes[sender].type == BYZANTINE) {
        // 攻击模式 1:随机装死 (保留原有的崩溃容错测试)
        if (rand() % 100 < 10) return; 

        // 攻击模式 2:两面三刀 (Equivocation)
        // 故意向编号为偶数的节点发送合法的 sequence,
        // 向编号为奇数的节点发送一个篡改的恶意 sequence (例如 9999)
        // 目的:试图撕裂网络,让一半人同意 seq 1,一半人同意 seq 9999
        if (type == PREPARE || type == COMMIT) {
            if (target % 2 != 0) {
                send_sequence = 9999; 
                // printf("[攻击] 拜占庭节点 %d 对 节点 %d 投毒!Seq修改为9999\n", sender, target);
            }
        }
    }

    simulate_network_delay();

    EnterCriticalSection(&nodes[target].msg_cs);
    if (nodes[target].count < QUEUE_CAPACITY) {
        int tail = nodes[target].tail;
        nodes[target].msg_queue[tail].sender = sender;
        nodes[target].msg_queue[tail].target = target;
        nodes[target].msg_queue[tail].type = type;
        
        // 发送被拜占庭节点可能篡改过的 sequence!
        nodes[target].msg_queue[tail].sequence = send_sequence; 
        
        nodes[target].tail = (tail + 1) % QUEUE_CAPACITY;
        nodes[target].count++;
        SetEvent(nodes[target].msg_queue_event);
    }
    LeaveCriticalSection(&nodes[target].msg_cs);
}

2.2 为什么 PBFT 不怕这种攻击?

如果你跑了上面这段投毒代码,你会发现:好人们依然能够顺利达成共识(到达 COMMITTED 状态)! 这就是 PBFT 的牛逼之处,也是为什么 Quorum 必须是 $2f+1$ 的底层物理学:

  1. 撕裂失败:拜占庭节点给奇数节点发 9999,奇数节点收到后,发现 msg->sequence != node->sequence (原代码中的校验逻辑),直接丢弃。

  2. 票数压制:即便拜占庭节点把假票投进了计票器,但诚实节点有 $N - f = 5$ 个。这 5 个人互相投票,刚好能凑齐 $QUORUM = 5$ 票!坏蛋那 2 张废票,根本不足以改变大局。

三、 砸碎“上帝视角”:实现真·去中心化架构

原 AI 代码里最让人吐槽的,是这段“上帝视角”的代码:

// 原代码中的极度不合理之处
EnterCriticalSection(&global_cs);
if (!consensus_reached) {
    consensus_reached = TRUE; // 上帝说:哦,共识达成了!
    SetEvent(consensus_complete_event); // 上帝敲锣打鼓结束所有线程
}
LeaveCriticalSection(&global_cs);

底层老兵的咆哮: 在分布式环境里,根本不存在一个可以包揽全局的 global_cs (全局锁)!每个节点就是一台孤零零放在机房里的服务器,它能依靠的只有自己的内存状态!

我们要删掉全局锁,删掉全局事件,让每个节点“独立宣布胜利”

3.1 去中心化的 Commit 处理逻辑重构

每个节点只在自己的 handle_commit 里,通过判断自己的 commit_count 数组,自己决定是否上链。

// 重构:原汁原味的去中心化节点逻辑
void handle_commit(Node *node, Message *msg) {
    // 拜占庭节点瞎胡闹,它不按套路出牌
    if (node->type == BYZANTINE) return; 

    // 核心安全校验:只收当前处理序列号的票
    if (msg->sequence == node->sequence) {
        // 防止重放攻击:同一个人只能投一票!原代码用 1 赋值而非 ++ 极其精妙
        node->commit_count[msg->sender] = 1;

        int commit_total = 0;
        for (int i = 0; i < N_NODES; i++) {
            commit_total += node->commit_count[i];
        }

        // 去中心化的胜利宣告!
        // 只要满足法定人数 (QUORUM),并且处于 PREPARED 状态
        if (commit_total >= QUORUM && node->state == PREPARED) {
            node->state = COMMITTED;
            
            // 没有什么全局锁,没有全局事件!
            // 只有自己机房里这台机器默默地写盘、落库
            printf("[节点 %02d 独立宣告] 收集到 %d 张 Commit 票,Sequence %d 正式写入本地账本!\n", 
                   node->id, commit_total, node->sequence);
                   
            // 真实系统中,这里会触发持久化机制 (写入 LevelDB/RocksDB)
            // write_to_disk(node->sequence, "Transaction Data");
        }
    }
}

3.2 Main 函数主循环的涅槃重构

既然没有了上帝视角的 consensus_complete_event,我们的主线程 main 怎么知道仿真结束了? 答案是:等待超时 (Timeout),或者在外部做一个监控线程不断轮询各自的状态(类似 Prometheus 拉取监控指标)。

我们将原主循环改为更硬核的线程汇合 (Join) 与状态探针机制:

int main() {
    // ... 前置初始化与创建线程 (同原代码,但移除了 global_cs) ...

    printf("--- 硬核去中心化 PBFT 仿真启动 ---\n");
    printf("主节点 Node 0 即将发起提案...\n");
    
    // 给系统 3 秒钟的时间去网状通信、达成共识
    // 真实场景下,节点是永不退出的死循环。
    Sleep(3000); 

    // --- 监控探针阶段 (取代上帝视角的暴力判断) ---
    printf("\n=== 3秒通信时间到,主线程作为外部监控开始探针审计 ===\n");
    printf("\n%-8s | %-10s | %-12s | %-10s | %-10s\n", 
           "节点ID", "身份", "最终状态", "Prepare票", "Commit票");
    printf("------------------------------------------------------------\n");

    int honest_committed_count = 0;

    for (int i = 0; i < N_NODES; i++) {
        int prep_total = 0, comm_total = 0;
        for (int j = 0; j < N_NODES; j++) {
            prep_total += nodes[i].prepare_count[j];
            comm_total += nodes[i].commit_count[j];
        }

        char *node_type = (nodes[i].type == BYZANTINE) ? "★拜占庭" : "诚实节点";
        char *state_str = "未达共识";
        
        if (nodes[i].state == COMMITTED) {
            state_str = "已写入账本";
            if (nodes[i].type == HONEST) honest_committed_count++;
        }

        printf("Node %02d   | %-10s | %-12s | %-10d | %-10d\n",
               i, node_type, state_str, prep_total, comm_total);
    }

    printf("------------------------------------------------------------\n");
    
    // 最终业务层研判:只要大多数诚实节点(至少 f+1)提交了,系统就是安全的
    if (honest_committed_count >= (F_TOLERANCE + 1)) {
        printf(">> 结论:系统在遭受拜占庭攻击下,【成功】保住了共识一致性!\n");
    } else {
        printf(">> 结论:系统被攻破或发生分叉!\n");
    }

    // 强杀模拟线程,清理资源
    // ...
    return 0;
}

四、 总结与升华:从 C 语言到工业级架构

经过上、下两篇的彻底爆改,这份原本勉强能跑的 AI 代码,已经被我们改造出了工业级底层的雏形:

核心重构对照表

维度

AI 初始版本 (大作业水平)

咱们的爆改版本 (资深开发水平)

内存与性能

O(N) 数组平移,CPU浪费严重

O(1) 环形缓冲区,极限压榨CPU性能

并发模型

带有上帝视角的全局锁,伪分布式

全异步事件驱动 Reactor模型,彻底去中心化

拜占庭逻辑

仅模拟断网丢包 (CFT)

注入两面三刀 (Equivocation) 攻击逻辑,真BFT压测

防重放攻击

代码已有体现 (count[sender]=1)

深入讲解了为何不用 ++ 而用 = 的幂等性防刷票设计

最后,致各位正在啃底层、刷算法的兄弟们: 很多人觉得区块链全是发币骗人的,但剥去金融的外衣,像 PBFT、Raft、Paxos 这样的分布式共识算法,是目前计算机科学领域最耀眼的明珠之一。

当你用 C 语言的指针、锁、状态机亲手把它们拼装起来,看着线程在并发的混乱中,通过精妙的投票数学($3f+1$)最终走向秩序和统一时,那种作为系统架构师的掌控感和爽感,是写多少个增删改查业务都换不来的!

这篇万字拆解呕心沥血,图表、代码全手打。如果你能看到这里,说明你绝对是个追求极致的技术极客!!!

附录,300行源码:

#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>

#define N_NODES 7 // Reduced to 7 nodes for better testing (f=2, quorum=5)
#define F_TOLERANCE 2
#define QUORUM 5
#define MIN_LATENCY 10 // Reduced delay for faster execution
#define MAX_LATENCY 50
#define BYZANTINE_NODES 1 // Only 1 Byzantine node

// Message types
typedef enum
{
    PRE_PREPARE,
    PREPARE,
    COMMIT
} MsgType;

// Node types
typedef enum
{
    HONEST,
    BYZANTINE
} NodeType;

// Message structure
typedef struct
{
    int sender;
    int target;
    MsgType type;
    int sequence;
} Message;

// Node states
typedef enum
{
    IDLE,
    PRE_PREPARED,
    PREPARED,
    COMMITTED
} NodeState;

// Node structure
typedef struct
{
    int id;
    NodeType type;
    NodeState state;
    int sequence;
    int prepare_count[N_NODES];
    int commit_count[N_NODES];
    BOOL has_sent_prepare;
    BOOL has_sent_commit;
    CRITICAL_SECTION msg_cs;
    HANDLE msg_queue_event;
    Message msg_queue[50];
    int msg_queue_size;
} Node;

// Global variables
Node nodes[N_NODES];
CRITICAL_SECTION global_cs;
HANDLE consensus_complete_event;
BOOL consensus_reached = FALSE;
int total_messages_sent = 0;

// Simulate network delay
void simulate_network_delay()
{
    int delay = MIN_LATENCY + rand() % (MAX_LATENCY - MIN_LATENCY);
    if ((rand() % 100) < 10)
    { // 10% chance of extra delay
        delay += 100;
    }
    Sleep(delay);
}

// Send message to specific node
void send_message_to_node(int sender, int target, MsgType type, int sequence)
{
    if (target < 0 || target >= N_NODES)
        return;

    // Byzantine nodes may drop or corrupt messages
    if (nodes[sender].type == BYZANTINE)
    {
        if (rand() % 100 < 30)
        { // 30% chance to drop message
            return;
        }
    }

    simulate_network_delay();

    EnterCriticalSection(&nodes[target].msg_cs);
    if (nodes[target].msg_queue_size < 50)
    {
        nodes[target].msg_queue[nodes[target].msg_queue_size].sender = sender;
        nodes[target].msg_queue[nodes[target].msg_queue_size].target = target;
        nodes[target].msg_queue[nodes[target].msg_queue_size].type = type;
        nodes[target].msg_queue[nodes[target].msg_queue_size].sequence = sequence;
        nodes[target].msg_queue_size++;
        SetEvent(nodes[target].msg_queue_event);

        EnterCriticalSection(&global_cs);
        total_messages_sent++;
        LeaveCriticalSection(&global_cs);
    }
    LeaveCriticalSection(&nodes[target].msg_cs);
}

// Broadcast message to all nodes
void broadcast_message(int sender, MsgType type, int sequence)
{
    for (int i = 0; i < N_NODES; i++)
    {
        if (i != sender)
        {
            send_message_to_node(sender, i, type, sequence);
        }
    }
}

// Handle Pre-prepare message
void handle_pre_prepare(Node *node, Message *msg)
{
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 40)
        {
            return;
        }
    }

    if (msg->sequence == node->sequence)
    {
        node->state = PRE_PREPARED;

        if (!node->has_sent_prepare)
        {
            node->has_sent_prepare = TRUE;
            broadcast_message(node->id, PREPARE, msg->sequence);
        }
    }
}

// Handle Prepare message
void handle_prepare(Node *node, Message *msg)
{
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 20)
        {
            return;
        }
    }

    if (msg->sequence == node->sequence)
    {
        node->prepare_count[msg->sender] = 1;

        int prepare_total = 0;
        for (int i = 0; i < N_NODES; i++)
        {
            prepare_total += node->prepare_count[i];
        }

        if (prepare_total >= QUORUM && node->state == PRE_PREPARED)
        {
            node->state = PREPARED;

            if (!node->has_sent_commit)
            {
                node->has_sent_commit = TRUE;
                broadcast_message(node->id, COMMIT, msg->sequence);
            }
        }
    }
}

// Handle Commit message
void handle_commit(Node *node, Message *msg)
{
    if (node->type == BYZANTINE)
    {
        if (rand() % 100 < 10)
        {
            return;
        }
    }

    if (msg->sequence == node->sequence)
    {
        node->commit_count[msg->sender] = 1;

        int commit_total = 0;
        for (int i = 0; i < N_NODES; i++)
        {
            commit_total += node->commit_count[i];
        }

        if (commit_total >= QUORUM && node->state == PREPARED)
        {
            node->state = COMMITTED;
            printf("[Node %02d] Consensus reached! Sequence: %d\n", node->id, node->sequence);

            EnterCriticalSection(&global_cs);
            if (!consensus_reached)
            {
                consensus_reached = TRUE;
                SetEvent(consensus_complete_event);
            }
            LeaveCriticalSection(&global_cs);
        }
    }
}

// Node main thread
DWORD WINAPI NodeThread(LPVOID lpParam)
{
    Node *node = (Node *)lpParam;

    // Primary node (node 0) initiates the request
    if (node->id == 0 && node->type == HONEST)
    {
        Sleep(50); // Wait for other nodes to start
        printf("Primary Node 00 initiating proposal (Pre-prepare phase)...\n\n");
        broadcast_message(0, PRE_PREPARE, node->sequence);
    }

    // Message processing loop
    while (WaitForSingleObject(consensus_complete_event, 50) == WAIT_TIMEOUT)
    {
        // self注释:外层循环作用:循环等待consensus_complete_event,如果consensus_complete_event没有被触发,则进入内层循环,处理消息。一旦事件被触发,则结束外层循环,进入内层循环。

        WaitForSingleObject(node->msg_queue_event, 50);
        // 内层的循环:循环等待节点自己的消息msg_queue,如果msg_queue有消息,则进入内层循环,处理消息。

        EnterCriticalSection(&node->msg_cs);
        if (node->msg_queue_size > 0)
        {
            Message msg = node->msg_queue[0];
            for (int i = 0; i < node->msg_queue_size - 1; i++)
            {
                node->msg_queue[i] = node->msg_queue[i + 1];
            }
            node->msg_queue_size--;
            ResetEvent(node->msg_queue_event);
            LeaveCriticalSection(&node->msg_cs);

            switch (msg.type)
            {
            case PRE_PREPARE:
                handle_pre_prepare(node, &msg);
                break;
            case PREPARE:
                handle_prepare(node, &msg);
                break;
            case COMMIT:
                handle_commit(node, &msg);
                break;
            default:
                break;
            }
        }
        else
        {
            LeaveCriticalSection(&node->msg_cs);
        }
    }

    return 0;
}

int main()
{
    srand(time(NULL));
    InitializeCriticalSection(&global_cs);
    consensus_complete_event = CreateEvent(NULL, TRUE, FALSE, NULL);

    printf("--- Optimized PBFT Simulation Started ---\n");
    printf("Nodes: %d, Fault Tolerance: %d, Byzantine Nodes: %d\n",
           N_NODES, F_TOLERANCE, BYZANTINE_NODES);

    // Initialize all nodes
    for (int i = 0; i < N_NODES; i++)
    {
        nodes[i].id = i;
        // Make sure primary node (node 0) is always honest
        nodes[i].type = (i == 0) ? HONEST : (i <= BYZANTINE_NODES) ? BYZANTINE
                                                                   : HONEST;
        nodes[i].state = IDLE;
        nodes[i].sequence = 1;
        nodes[i].has_sent_prepare = FALSE;
        nodes[i].has_sent_commit = FALSE;
        nodes[i].msg_queue_size = 0;
        InitializeCriticalSection(&nodes[i].msg_cs);
        nodes[i].msg_queue_event = CreateEvent(NULL, FALSE, FALSE, NULL);
        memset(nodes[i].prepare_count, 0, sizeof(nodes[i].prepare_count));
        memset(nodes[i].commit_count, 0, sizeof(nodes[i].commit_count));
    }

    // Create threads for all nodes
    HANDLE threads[N_NODES];
    for (int i = 0; i < N_NODES; i++)
    {
        threads[i] = CreateThread(NULL, 0, NodeThread, &nodes[i], 0, NULL);
    }

    // Wait for consensus or timeout (5 seconds)
    if (WaitForSingleObject(consensus_complete_event, 5000) == WAIT_OBJECT_0)
    {
        printf("\n=== CONSENSUS SUCCESSFULLY REACHED! ===\n");
    }
    else
    {
        printf("\n=== CONSENSUS TIMEOUT - FAILED TO REACH AGREEMENT ===\n");
    }

    // Wait for all threads to finish
    WaitForMultipleObjects(N_NODES, threads, TRUE, 1000);

    // Print statistics
    printf("\n%-8s | %-10s | %-12s | %-10s | %-10s\n", "NodeID", "Type", "State", "PrepCount", "CommCount");
    printf("------------------------------------------------------------\n");

    for (int i = 0; i < N_NODES; i++)
    {
        int prep_total = 0, comm_total = 0;
        for (int j = 0; j < N_NODES; j++)
        {
            prep_total += nodes[i].prepare_count[j];
            comm_total += nodes[i].commit_count[j];
        }

        char *node_type = (nodes[i].type == BYZANTINE) ? "Byzantine" : "Honest";
        char *state_str;
        switch (nodes[i].state)
        {
        case IDLE:
            state_str = "Idle";
            break;
        case PRE_PREPARED:
            state_str = "PrePrep";
            break;
        case PREPARED:
            state_str = "Prepared";
            break;
        case COMMITTED:
            state_str = "Committed";
            break;
        default:
            state_str = "Unknown";
            break;
        }

        printf("Node %02d   | %-10s | %-12s | %-10d | %-10d\n",
               i, node_type, state_str, prep_total, comm_total);
    }

    printf("------------------------------------------------------------\n");
    printf("Total Network Messages: %d\n", total_messages_sent);

    // Cleanup
    CloseHandle(consensus_complete_event);
    for (int i = 0; i < N_NODES; i++)
    {
        DeleteCriticalSection(&nodes[i].msg_cs);
        CloseHandle(nodes[i].msg_queue_event);
    }
    DeleteCriticalSection(&global_cs);

    return 0;
}

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐