单 Epoll 多线程 IO 模型
最近在使用 Folly 的协程做 RPC 框架,遇到一些问题:
- Folly 的 EventBase 与 AsyncSocket 是绑定的,Socket 连接建立后不能再跨线程操作
- Folly 的 IOThreadPoolExecutor 执行协程的
co_await
操作时,会出现线程切换的情况
这就导致了目前只能使用单线程的 IOThreadPoolExecutor 执行协程中的 IO 操作。即使修复了这个问题,每个线程一个 EventBase 也会限制对应连接的 IO 能力。好在目前的 RPC 框架中已经使用队列剥离了 IO 操作和请求处理,请求处理阶段依然可以使用 Folly 的协程框架,IO 部分可以替换为无协程的实现。为了尽可能地提升性能,笔者自己造了一个单 Epoll 多线程 IO 的轮子,供参考。
设计目标
对于这个 IO 模型,笔者计划的目标是:
- 单 Epoll,只负责处理可读/可写事件,不负责具体的 IO 操作
- 多线程,负责并发地处理 Epoll 丢过来的具体 IO 操作任务
- 支持并发读写同一个连接,读写操作均 Lock-free
Epoll 事件触发时机
先看一个 Epoll 边缘触发的例子:
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <atomic>
#include <thread>
#define CHECK(expr) \
do { \
int ret = (expr); \
if (ret != 0) { \
printf("%d error %d\n", __LINE__, ret); \
return ret; \
} \
} while (0);
constexpr uint16_t port = 8000;
constexpr uint32_t maxevents = 32;
std::atomic<bool> quit{false};
int server() {
struct sockaddr_in serv_addr;
socklen_t serv_len = sizeof(serv_addr);
int lfd = ::socket(AF_INET, SOCK_STREAM, 0);
CHECK(::fcntl(lfd, F_SETFL, ::fcntl(lfd, F_GETFL) | O_NONBLOCK));
int value = 1;
CHECK(::setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int)));
CHECK(::setsockopt(lfd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(int)));
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(port);
CHECK(::bind(lfd, (struct sockaddr *)&serv_addr, serv_len));
CHECK(::listen(lfd, 64));
int epfd = ::epoll_create(maxevents);
struct epoll_event ev {
EPOLLIN | EPOLLET, { .fd = lfd }
};
CHECK(::epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev));
while (true) {
struct epoll_event events[maxevents];
int ret = ::epoll_wait(epfd, events, maxevents, -1);
CHECK(ret == -1 ? errno : 0);
if (quit.load()) {
break;
}
for (int i = 0; i < ret; ++i) {
int fd = events[i].data.fd;
if (fd == lfd) {
struct sockaddr_in client_addr;
socklen_t cli_len = sizeof(client_addr);
int cfd = ::accept(lfd, (struct sockaddr *)&client_addr, &cli_len);
CHECK(cfd == -1 ? errno : 0);
int flags = ::fcntl(cfd, F_GETFL, 0);
CHECK(flags == -1 ? errno : 0);
CHECK(::fcntl(cfd, F_SETFL, (flags | O_NONBLOCK)));
struct epoll_event evt {
(EPOLLIN | EPOLLOUT | EPOLLET), { .fd = cfd }
};
CHECK(::epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &evt));
continue;
}
int e = events[i].events;
bool in = e & EPOLLIN;
bool out = e & EPOLLOUT;
printf("fd: %d, events %d, epoll in %d, epoll out %d\n", fd, e, in, out);
}
}
::close(epfd);
::close(lfd);
return 0;
}
int client() {
struct sockaddr_in cli_addr;
socklen_t cli_len = sizeof(cli_addr);
memset(&cli_addr, 0, sizeof(cli_addr));
cli_addr.sin_family = AF_INET;
cli_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
cli_addr.sin_port = htons(port);
int fd = ::socket(AF_INET, SOCK_STREAM, 0);
CHECK(::connect(fd, (struct sockaddr *)&cli_addr, cli_len));
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(::write(fd, "write", 5) - 5);
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(::write(fd, "write", 5) - 5);
std::this_thread::sleep_for(std::chrono::seconds(1));
quit = true;
CHECK(::close(fd));
return 0;
}
int main() {
std::thread svr(server);
std::thread cli(client);
cli.join();
svr.join();
return 0;
}
从上面的例子得到的结论是,当 fd 发生以下几种状态变化时, epoll_wait
会返回该 fd 所有关注的事件。
- 由不可读变为可读
- 由不可写变为可写
- 可读状态下继续收到新数据
进而,当 fd 可读或可写时,Epoll 也可能会重复的报告该事件。
可靠地处理读事件
设计目标中希望 Epoll 收到可读事件后,打包成 IO 任务丢给线程池。但 Epoll 可能会重复的报告可读事件,此时可能有正在执行的读操作,造成竞争。这也就需要一种机制,保证可读事件出现时,有且仅有一个正在执行的读操作,完成所有可读事件的处理。
加锁非吾所愿。分析一下当前的需求:
- 当 Epoll 收到可读事件时,如果当前还没有正在执行的读操作,则尝试启动一个;否则想办法通知该任务有新的可读事件出现
- 当读操作返回 EAGAIN 时,如果当前没有新的可读事件出现,则结束;否则重新打包一个读任务,同步或异步地去执行
该需求可以通过原子量完成,需要使用两个 Bit 位,分别命名为 Reading 和 NewEvent。Reading 为 1 时表示当前已经有读操作启动,NewEvent 为 1 时表示有新的可读事件出现。在可读事件出现时,同时打上 Reading 和 NewEvent 标记,如果此前没有 Reading 标记,则启动一个新的读任务;读任务启动时去除 NewEvent 标记,而后持续进行读操作直至返回 EAGAIN
,此时检查是否有 NewEvent 标记,如果有则去除该标记继续重试,否则尝试 CAS 去除 Reading 标记,如果成功则该读任务成功退出。这样可以保证总会有一个正在执行或待执行的读任务处理新出现的读事件。
// Epoll thread
if (events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
auto flags = socket->flags.fetch_or(ReadingFlag | NewEventFlag);
if ((flags & ReadingFlag) == 0) {
socket->start_read_task();
}
}
// Read thread
void read_task(Socket *socket) {
socket->flags &= ~NewEventFlag;
while (true) {
int ret = ::read(socket->fd, buff, size);
if (ret > 0) {
// ...
} else if (ret == 0) {
// closed.
} else if (errno == EAGAIN) {
auto flags = socket->flags.load(std::memory_order_acquire);
while (true) {
if (flags & NewEventFlag) {
socket->flags &= ~NewEventFlag;
// 当前有新的可读事件出现,可以直接重试,或者重新打包一个读任务丢给线程池
// ...
}
auto newFlags = flags & ~ReadingFlag.
if (socket->flags.compare_exchange_strong(flags, newFlags)) {
// CAS 成功,去除了 ReadingFlag,可以成功退出
}
// 如果失败,则继续重试
}
} else {
// error.
}
}
}
可靠地处理写事件
与读事件的处理略微有些不同,写事件的处理要麻烦一些。当写入队列中有数据、且 fd 可写时,就需要有且仅有一个写任务处理所有的写入。也就说,当前没有写任务的前提下,会有两种可能的启动写任务的场景:
- fd 当前可写,写入队列中首次加入数据
- 写入队列中有数据,fd 首次由不可写转为可写
写入任务退出也有两种场景:
- 写入队列写空
- fd 不可写
bRPC 是这样处理上述需求的:
- 写入队列首次加入数据时,启动写任务;写入队列中已经有数据时,则等待已有的写任务完成所有的写入
- 当 fd 不可写时,再将对应的 EPOLLOUT 事件增加到 Epoll 的关注列表中,待 fd 可写时,将该关注移除,恢复写任务
- 只有写入队列写完时,写入任务才会退出
bRPC 的处理是完备的,但可能需要频繁地操作 Epoll,这也是非吾所愿。笔者这里仍然依靠原子量实现写任务的启动与退出。这里需要使用四个 Bit 位,使用 Writable 和 NewEpollOut 表示当前 fd 是否可写、是否有新的可写入事件,使用 HasMsg 和 NewMsg 表示当前写入队列是否有数据、是否有新的数据插入队列。当且仅当 Writable 和 HasMsg 首次同时变为 1 时才启动写入任务。当写入队列写完时,尝试去除 HasMsg 和 NewMsg 标记;当 fd 不可写时,尝试去除 Writable 和 NewEpollOut 标记;当 Epoll 收到新的可写入事件时,仍然使用 fetch_or
打上可写入标记,并判断是否启动新的写入任务。事实上 Epoll 收到可读可写事件时,可以一次性地完成 fetch_or
打标记的任务。
相关链接
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
下一篇: 不要相信一个熬夜的人说的每一句话
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论