从零学会epoll的使用和原理

第一步:理解 select / poll 的缺陷

一、select 和 poll 是什么?

它们是 Linux 提供的 I/O 多路复用机制,可以让我们同时监听多个文件描述符(fd),比如 socket,来等待“是否有数据可以读/写”。


二、select 工作流程(伪代码)

fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sockfd, &readfds);

select(maxfd + 1, &readfds, NULL, NULL, NULL);

if (FD_ISSET(sockfd, &readfds)) {
    // 有数据可读
}

三、poll 工作流程(伪代码)

struct pollfd fds[1024];
poll(fds, nfds, timeout);

四、select 和 poll 的主要缺陷

缺陷点 描述
1. fd 数量限制 select 的 fd 上限是 1024(因为用的是 bitmap)
2. 每次都要传入整个 fd 集合 调用时都需要将所有监听的 fd 从用户态拷贝到内核态
3. 事件通知不高效 select/poll 会遍历所有 fd,查找哪一个就绪,O(n) 时间复杂度
4. 无状态 每次调用都要重新设置监听 fd 的集合,没法复用
5. 边缘触发支持差 不支持高效的边缘触发,只能是水平触发(LT)

📌 总结一句话:

select/poll 太“啰嗦”和“笨重”,当连接数上千上万时,它们效率非常低,而 epoll 专为这种高并发场景优化。


select 示例代码(监听 stdin 和 socket)

#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    bind(listen_fd, (sockaddr*)&addr, sizeof(addr));
    listen(listen_fd, 5);

    std::cout << "Listening on 127.0.0.1:8888...\n";

    fd_set readfds;
    int max_fd = std::max(listen_fd, STDIN_FILENO); // 最大 fd

    while (true) {
        FD_ZERO(&readfds);
        FD_SET(STDIN_FILENO, &readfds);   // 标准输入
        FD_SET(listen_fd, &readfds);      // 监听 socket

        int ready = select(max_fd + 1, &readfds, nullptr, nullptr, nullptr);
        if (ready == -1) {
            perror("select");
            break;
        }

        if (FD_ISSET(STDIN_FILENO, &readfds)) {
            char buf[1024] = {};
            read(STDIN_FILENO, buf, sizeof(buf));
            std::cout << "[STDIN] 输入了: " << buf;
        }

        if (FD_ISSET(listen_fd, &readfds)) {
            sockaddr_in client{};
            socklen_t len = sizeof(client);
            int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);
            std::cout << "[SOCKET] 新连接来自: " << inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";
            close(conn_fd);
        }
    }

    close(listen_fd);
    return 0;
}

🔍 如何运行这个 demo:

  1. 编译:
g++ select_demo.cpp -o select_demo
  1. 运行:
./select_demo
  1. 打开另一个终端连接:
telnet 127.0.0.1 8888
  1. 或者在当前终端直接输入一些文字,它会 echo 出来。

📌 你能观察到什么?

  • 每次 select() 都要重新设置 fd_set
  • 没有事件时程序就会阻塞在 select()
  • 随着连接越来越多,你会发现效率会下降。

第二阶段:epoll 的基本使用

一、epoll 的工作流程(核心三步)

epoll 的使用核心是三步:

步骤 函数名 作用
① 创建 epoll 对象 epoll_create1() 创建 epoll 文件描述符
② 注册事件 epoll_ctl() 将监听的 fd 注册到 epoll 实例
③ 等待事件 epoll_wait() 阻塞等待就绪事件,返回活跃的 fd

二、epoll 的基本代码结构(伪代码)

int epfd = epoll_create1(0);   // 创建 epoll 实例

epoll_event ev;
ev.events = EPOLLIN;          // 关注读事件
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);  // 添加 fd 到 epoll

epoll_event events[1024];     // 返回的事件数组
int n = epoll_wait(epfd, events, 1024, -1);       // 阻塞直到有事件发生

for (int i = 0; i < n; ++i) {
    if (events[i].data.fd == listen_fd) {
        accept();  // 新连接
    } else {
        read();    // 读数据
    }
}

三、epoll 示例:监听 stdin 和 socket(对比 select)

我们来把刚才 select 的例子,用 epoll 重写一遍

#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    bind(listen_fd, (sockaddr*)&addr, sizeof(addr));
    listen(listen_fd, 5);
    std::cout << "Listening on 127.0.0.1:8888...\n";

    int epfd = epoll_create1(0);  // 创建 epoll 对象

    epoll_event ev{};
    ev.events = EPOLLIN;
    ev.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);  // 添加监听 fd

    ev.data.fd = STDIN_FILENO;
    epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);  // 添加 stdin

    epoll_event events[1024];

    while (true) {
        int n = epoll_wait(epfd, events, 1024, -1);
        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;
            if (fd == listen_fd) {
                sockaddr_in client{};
                socklen_t len = sizeof(client);
                int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);
                std::cout << "[SOCKET] 新连接来自: " << inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";
                close(conn_fd);
            } else if (fd == STDIN_FILENO) {
                char buf[1024] = {};
                read(STDIN_FILENO, buf, sizeof(buf));
                std::cout << "[STDIN] 输入了: " << buf;
            }
        }
    }

    close(listen_fd);
    close(epfd);
    return 0;
}

四、为什么 epoll 更优秀?

比较项 select/poll epoll
fd 数量上限 select: 1024,poll 无限制但效率低 理论上无上限,效率高
是否复用 fd 集合 否,每次都要传 是,注册一次后直接等待
是否遍历所有 fd 是(每次都查) 否(事件就绪才通知)
通知机制 轮询 事件驱动

第三阶段:理解 ET(边缘触发)与非阻塞 I/O 的配合

这部分是 epoll 的精髓,也是它比 select 更高效的关键所在。我们先来搞清楚 LT vs ET,然后说非阻塞,再用代码加深理解。


一、什么是 LT 和 ET?

epoll 支持两种事件触发模式:

模式 名称 特点 默认值
LT Level Trigger(水平触发) 只要条件满足(比如有数据),每次 epoll_wait 都会返回这个事件 ✅ 默认
ET Edge Trigger(边缘触发) 只有状态从无到有变化时,才触发事件通知,只通知一次 ❌ 不是默认,需要手动设置

二、简单对比:LT vs ET

假设 socket 缓冲区有数据:

  • LT 模式:
    • epoll_wait 每次都会告诉你“有数据!”,直到你读完。
  • ET 模式:
    • 只在数据第一次到达时通知你一次,“之后不管了”。
    • 如果你没一次性读完所有数据,就会“丢事件”,程序卡住。

三、为什么 ET 更快?

因为:

  • 内核只在状态变化时通知,不会反复通知同一个事件;
  • 节省系统调用时间(高并发下效果特别明显);
  • 但是使用更复杂——必须配合非阻塞 I/O

四、非阻塞 I/O 是什么?

默认情况下,socket 是阻塞的:

char buf[1024];
read(fd, buf, sizeof(buf)); // 如果没数据,会卡住等

非阻塞模式下,read 立刻返回:

fcntl(fd, F_SETFL, O_NONBLOCK); // 设置非阻塞
  • 有数据就读
  • 没数据就返回 -1,errno=EAGAIN / EWOULDBLOCK

五、ET + 非阻塞 I/O 的典型套路:

while (true) {
    ssize_t n = read(fd, buf, sizeof(buf));
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) break; // 没数据可读
        else perror("read error");
    } else if (n == 0) {
        // 对方关闭连接
        break;
    } else {
        // 正常读取数据
    }
}

六、设置 ET 模式和非阻塞 socket 的完整代码片段

// 设置 fd 为非阻塞
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

// 注册 ET 模式
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

七、ET 模式小结

问题 答案
必须配合非阻塞? ✅ 是的,不然容易卡死
要用循环读数据? ✅ 是的,直到返回 EAGAIN
什么时候触发? 数据从没有 ➜ 有,才触发
好处? 高性能,减少系统调用
风险? 没处理好容易漏事件、阻塞

巩固下 ET 和非阻塞的相关细节

ET + 非阻塞 I/O 是 epoll 中最容易出 bug 的地方,

一、非阻塞 I/O 设置方式

我们经常对 socket 设置非阻塞,代码如下:

#include <fcntl.h>

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

📌 重点解释:

  • O_NONBLOCK 是一个 flag,它告诉内核这个 fd 不允许阻塞。
  • fcntl 拿到当前 flags,再 OR 一个非阻塞标志。

二、ET 模式触发行为再强化

我们看一个常见现象来理解 ET 和阻塞 read 的冲突

❌ 错误代码(阻塞读配合 ET)
// 假设这是 ET 模式下的回调处理
char buf[1024];
read(fd, buf, sizeof(buf));  // 没读完就等死了!

如果缓冲区里数据不够 1024 字节,这个 read 会阻塞——
ET 只通知一次! 你程序就卡死在 read 上,永远等不到下一次触发。


三、正确姿势:循环读取直到 EAGAIN

char buf[1024];
while (true) {
    ssize_t n = read(fd, buf, sizeof(buf));
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            // 没有数据了,结束循环
            break;
        } else {
            perror("read error");
            break;
        }
    } else if (n == 0) {
        // 对方关闭连接
        printf("Client closed connection\n");
        close(fd);
        break;
    } else {
        // 正常读取数据
        printf("Received: %.*s", (int)n, buf);
    }
}

📌 关键点总结:

行为 必须这么做的原因
循环读取 ET 只触发一次,要把所有数据读干净
判断 errno 确认是“真的没数据”而不是其他错误
处理 n == 0 表示连接断开,必须 close(fd)

四、添加客户端连接时也要设置非阻塞!

int connfd = accept(listen_fd, ...);
set_nonblocking(connfd);  // 否则 read 可能阻塞

五、epoll 注册事件时需要设置 EPOLLET

epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

epoll + ET + 非阻塞 I/O 的最小完整 demo

场景功能说明:

  • 使用 epoll 的 ET 模式监听所有 socket。
  • 所有 fd 设置为非阻塞。
  • 客户端发送数据,服务端完整读完并打印。
  • 使用 read() + EAGAIN 机制读取所有数据。

代码:epoll_et_server.cpp

#include <iostream>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    bind(listen_fd, (sockaddr*)&addr, sizeof(addr));
    listen(listen_fd, SOMAXCONN);
    set_nonblocking(listen_fd);

    int epfd = epoll_create1(0);

    epoll_event ev{};
    ev.events = EPOLLIN | EPOLLET;  // 注意:ET 模式
    ev.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    std::vector<epoll_event> events(1024);

    std::cout << "Server listening on 127.0.0.1:8888\n";

    while (true) {
        int n = epoll_wait(epfd, events.data(), events.size(), -1);
        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;

            if (fd == listen_fd) {
                // 处理所有新连接
                while (true) {
                    sockaddr_in client{};
                    socklen_t len = sizeof(client);
                    int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);
                    if (conn_fd == -1) break;

                    std::cout << "[New Connection] "
                              << inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";

                    set_nonblocking(conn_fd);
                    epoll_event cev{};
                    cev.events = EPOLLIN | EPOLLET;
                    cev.data.fd = conn_fd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &cev);
                }
            } else {
                // 处理已有连接的读事件
                while (true) {
                    char buf[1024];
                    ssize_t count = read(fd, buf, sizeof(buf));
                    if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            // 没数据可读了
                            break;
                        } else {
                            perror("read");
                            close(fd);
                            break;
                        }
                    } else if (count == 0) {
                        // 对方关闭连接
                        std::cout << "[Disconnect] fd = " << fd << "\n";
                        close(fd);
                        break;
                    } else {
                        std::cout << "[Read] fd = " << fd << ", content: "
                                  << std::string(buf, count);
                    }
                }
            }
        }
    }

    close(listen_fd);
    close(epfd);
    return 0;
}

编译 & 运行方式

g++ epoll_et_server.cpp -o epoll_et_server
./epoll_et_server

打开另一个终端:

telnet 127.0.0.1 8888
# 或用 nc 测试:
# echo "hello" | nc 127.0.0.1 8888

提示:

  • 你可以反复 telnet 连多个客户端;
  • 输入数据,服务端会完整打印;
  • 断开连接也能被正常检测并关闭;

🔥 第四阶段:写一个真正的高并发服务器,支持长连接、广播、线程池等!

🧩 项目目标功能

  • ✅ 基于 epoll 的 I/O 多路复用
  • ✅ 使用 ET 模式 + 非阻塞 I/O
  • ✅ 支持多个客户端的 长连接
  • ✅ 实现 客户端广播 功能(群发消息)
  • ✅ 使用 线程池 解耦 I/O 与业务处理

🚧 模块化开发路线图(按顺序完成)

🔹 第一步:网络通信模块(epoll + 非阻塞长连接管理)
  • 接受新连接
  • 使用 epoll 管理所有 socket
  • 使用 ET 模式读取客户端数据
🔹 第二步:线程池模块
  • 预先创建线程池(工作线程 + 任务队列)
  • 支持任务提交与消费
🔹 第三步:广播功能模块
  • 所有客户端连接都被管理起来
  • 某个客户端发消息 ➜ 广播给所有连接
🔹 第四步:服务端结构模块化 & 重构
  • 将各部分封装为类(Server / ThreadPool / Connection 等)
  • 保持逻辑清晰、模块解耦

我们现在从第一步开始:

📦 第一步:网络通信模块(只实现接收客户端连接 & 打印数据)

目标:

  • 使用 epoll + ET + 非阻塞 监听并读取每个连接的数据。
  • 支持多个客户端长连接、持续交互。
// server.cpp
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <vector>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd, SOMAXCONN);
    set_nonblocking(listen_fd);

    int epfd = epoll_create1(0);
    epoll_event ev{};
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    std::vector<epoll_event> events(1024);

    std::cout << "[INFO] Server started on port 8888" << std::endl;

    while (true) {
        int n = epoll_wait(epfd, events.data(), events.size(), -1);
        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;

            if (fd == listen_fd) {
                // 接收所有连接
                while (true) {
                    sockaddr_in client{};
                    socklen_t len = sizeof(client);
                    int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);
                    if (conn_fd == -1) break;

                    set_nonblocking(conn_fd);

                    epoll_event conn_ev{};
                    conn_ev.events = EPOLLIN | EPOLLET;
                    conn_ev.data.fd = conn_fd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev);

                    std::cout << "[CONNECT] New client: "
                              << inet_ntoa(client.sin_addr) << ":"
                              << ntohs(client.sin_port) << std::endl;
                }
            } else {
                // 读取数据
                while (true) {
                    char buf[1024];
                    ssize_t count = read(fd, buf, sizeof(buf));
                    if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                        perror("read error");
                        close(fd);
                        break;
                    } else if (count == 0) {
                        std::cout << "[DISCONNECT] fd: " << fd << std::endl;
                        close(fd);
                        break;
                    } else {
                        std::cout << "[DATA] fd " << fd << ": "
                                  << std::string(buf, count);
                    }
                }
            }
        }
    }

    close(epfd);
    close(listen_fd);
    return 0;
}



🔧 编译:
g++ server.cpp -o server
🚀 运行:
./server

然后另开几个终端:

telnet 127.0.0.1 8888
# 或用 nc 测试
# echo "hello" | nc 127.0.0.1 8888

🔹 第二步:线程池模块

// thread_pool.h
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>

class ThreadPool {
public:
    using Task = std::function<void()>;

    explicit ThreadPool(size_t thread_count = 4)
        : stop_flag(false) {
        for (size_t i = 0; i < thread_count; ++i) {
            workers.emplace_back([this]() {
                while (true) {
                    Task task;
                    {
                        std::unique_lock<std::mutex> lock(mtx);
                        cv.wait(lock, [this]() { return stop_flag || !tasks.empty(); });

                        if (stop_flag && tasks.empty()) return;
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::lock_guard<std::mutex> lock(mtx);
            stop_flag = true;
        }
        cv.notify_all();
        for (auto& t : workers) {
            if (t.joinable()) t.join();
        }
    }

    void enqueue(Task task) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            tasks.push(std::move(task));
        }
        cv.notify_one();
    }

private:
    std::vector<std::thread> workers;
    std::queue<Task> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    std::atomic<bool> stop_flag;
};

测试一下线程池

好的!我们来写一个简单的测试程序来验证这个 ThreadPool 是否正常工作。


示例:test_thread_pool.cpp
#include "thread_pool.h"
#include <iostream>
#include <chrono>

void test_task(int id) {
    std::cout << "[TASK START] Task " << id << " in thread "
              << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    std::cout << "[TASK END] Task " << id << " done\n";
}

int main() {
    ThreadPool pool(4);  // 启动4个线程

    for (int i = 0; i < 10; ++i) {
        pool.enqueue([i]() { test_task(i); });
    }

    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "[MAIN] Done\n";
    return 0;
}

🔧 编译:
g++ test_thread_pool.cpp -o test_thread_pool -std=c++11 -pthread

(注意加上 -pthread

🚀 运行:
./test_thread_pool

🔹 第三步:广播功能模块

// server.cpp (含线程池和广播结构)
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <vector>
#include <unordered_set>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "thread_pool.h"

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd, SOMAXCONN);
    set_nonblocking(listen_fd);

    int epfd = epoll_create1(0);
    epoll_event ev{};
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    std::vector<epoll_event> events(1024);
    std::unordered_set<int> clients;
    ThreadPool pool(4);

    std::cout << "[INFO] Server started on port 8888" << std::endl;

    while (true) {
        int n = epoll_wait(epfd, events.data(), events.size(), -1);
        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;

            if (fd == listen_fd) {
                while (true) {
                    sockaddr_in client{};
                    socklen_t len = sizeof(client);
                    int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);
                    if (conn_fd == -1) break;

                    set_nonblocking(conn_fd);
                    epoll_event conn_ev{};
                    conn_ev.events = EPOLLIN | EPOLLET;
                    conn_ev.data.fd = conn_fd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev);

                    clients.insert(conn_fd);

                    std::cout << "[CONNECT] "
                              << inet_ntoa(client.sin_addr) << ":"
                              << ntohs(client.sin_port)
                              << " (fd: " << conn_fd << ")" << std::endl;
                }
            } else {
                while (true) {
                    char buf[1024];
                    ssize_t count = read(fd, buf, sizeof(buf));
                    if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                        perror("read error");
                        close(fd);
                        clients.erase(fd);
                        break;
                    } else if (count == 0) {
                        std::cout << "[DISCONNECT] fd: " << fd << std::endl;
                        close(fd);
                        clients.erase(fd);
                        break;
                    } else {
                        std::string msg(buf, count);

                        // 异步处理(广播)
                        pool.enqueue([fd, msg, &clients]() {
                            std::string broadcast = "[Client " + std::to_string(fd) + "] " + msg;
                            for (int client_fd : clients) {
                                if (client_fd != fd) {
                                    send(client_fd, broadcast.c_str(), broadcast.size(), 0);
                                }
                            }
                            std::cout << "[BROADCAST from " << fd << "]: " << msg;
                        });
                    }
                }
            }
        }
    }

    close(epfd);
    close(listen_fd);
    return 0;
}

完成第三步 ✅:将线程池整合进服务器,并构建了广播功能的基础框架。


🧩 你现在拥有的功能:
  • 支持多个客户端 长连接
  • 客户端发送消息 ➜ 异步投递到线程池中处理
  • 消息被广播给所有其他客户端
  • 采用 epoll + ET + 非阻塞 I/O,高效并发

🚀 快速测试建议:
  1. 编译:
g++ server.cpp -o server -std=c++11 -pthread
  1. 运行服务端:
./server
  1. 用多个终端模拟客户端:
telnet 127.0.0.1 8888
# 或者 nc:
nc 127.0.0.1 8888

客户端 A 发消息 ➜ B、C 会收到。

模块封装重构(Server / Connection / ThreadPool 分离)

Server
// server.h
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <memory>
#include "thread_pool.h"
#include "connection.h"

class Server {
public:
    Server(int port = 8888);
    ~Server();
    void run();

private:
    void set_nonblocking(int fd);
    void handle_new_connection();
    void handle_client_event(int fd);

    int listen_fd_;
    int epoll_fd_;
    ThreadPool pool_;
    std::unordered_map<int, std::shared_ptr<Connection>> connections_;
    std::unordered_set<int> client_fds_;
};

// server.cpp
#include "server.h"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <iostream>
#include <cstring>

Server::Server(int port) : pool_(4) {
    listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    bind(listen_fd_, (sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd_, SOMAXCONN);
    set_nonblocking(listen_fd_);

    epoll_fd_ = epoll_create1(0);
    epoll_event ev{};
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listen_fd_;
    epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &ev);

    std::cout << "[INFO] Server started on port " << port << std::endl;
}

Server::~Server() {
    close(epoll_fd_);
    close(listen_fd_);
}

void Server::set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

void Server::run() {
    std::vector<epoll_event> events(1024);
    while (true) {
        int n = epoll_wait(epoll_fd_, events.data(), events.size(), -1);
        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;
            if (fd == listen_fd_) {
                handle_new_connection();
            } else {
                handle_client_event(fd);
            }
        }
    }
}

void Server::handle_new_connection() {
    while (true) {
        sockaddr_in client{};
        socklen_t len = sizeof(client);
        int conn_fd = accept(listen_fd_, (sockaddr*)&client, &len);
        if (conn_fd == -1) break;

        set_nonblocking(conn_fd);
        epoll_event ev{};
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = conn_fd;
        epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, conn_fd, &ev);

        auto conn = std::make_shared<Connection>(conn_fd);
        connections_[conn_fd] = conn;
        client_fds_.insert(conn_fd);

        std::cout << "[CONNECT] fd: " << conn_fd << std::endl;
    }
}

void Server::handle_client_event(int fd) {
    auto conn = connections_[fd];
    bool closed = false;
    std::string msg = conn->read_data(closed);
    if (closed) {
        std::cout << "[DISCONNECT] fd: " << fd << std::endl;
        close(fd);
        epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
        connections_.erase(fd);
        client_fds_.erase(fd);
    } else {
        pool_.enqueue([fd, msg, this]() {
            std::string broadcast = "[Client " + std::to_string(fd) + "] " + msg;
            for (int client_fd : client_fds_) {
                if (client_fd != fd && connections_.count(client_fd)) {
                    connections_[client_fd]->send_data(broadcast);
                }
            }
            std::cout << "[BROADCAST from " << fd << "]: " << msg;
        });
    }
}

我们现在完成了服务端模块的完整封装 ✅:

✅ 模块结构现状:
文件名 说明
connection.h 封装客户端连接的读写逻辑
thread_pool.h 通用线程池,用于异步任务调度
server.h/cpp Server 类封装 epoll + 连接管理 + 线程池

📦 使用方式(主程序 main.cpp):
#include "server.h"

int main() {
    Server server(8888);
    server.run();
    return 0;
}

编译方式:

g++ main.cpp server.cpp -o server -std=c++11 -pthread

C++ 项目结构模板

🗂 项目结构建议
epoll_server/
├── CMakeLists.txt
├── src/
│   ├── main.cpp
│   ├── server.cpp
│   ├── server.h
│   ├── connection.h
│   └── thread_pool.h
├── build/         ← 编译输出目录(CMake 会自动创建)
└── README.md
📄 CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(epoll_server)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -pthread")

# 包含头文件目录
include_directories(${PROJECT_SOURCE_DIR}/src)

# 源文件
file(GLOB SOURCES src/*.cpp)

# 可执行文件
add_executable(server ${SOURCES})
📄 示例 src/main.cpp
#include "server.h"

int main() {
    Server server(8888);
    server.run();
    return 0;
}
🧪 构建和运行步骤:
cd epoll_server
mkdir -p build && cd build
cmake ..
make
./server
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐