一般来说,现在大家 epoll 都是搭配着非阻塞 IO 一起用的,要问为什么?大家都这么用的啊,而且异步嘛,非阻塞嘛,很自然嘛
但是,非阻塞 IO 具体是怎么对 send 和 recv 起作用的。一般理解,我们之所以要用非阻塞,是为了避免这种情况:
客户端跟我们 tcp 三次握手完了,我们 listen fd 上得到一个 IN 事件了,然后 accept,得到一个新的 client fd,然后如果这个时候,客户端没有发字节流过来,我们调用 recv,就会因为服务器的系统缓冲区中空空如也,陷入阻塞,所以如果这里我们用上非阻塞,就可以让 recv 快速的返回一个 0,从而让 CPU 转而去处理其他 client fd,所以,非阻塞对 recv 的作用效果是很明显的。
但是 send 呢,即使是在阻塞情况下,我们调用 send,系统都是把我们的数据拷贝到系统缓冲区,然后就返回了,而并不需要等到这些数据真正发出来收到 ack 之后才返回,也就是说,这里纯粹是一个内存拷贝的动作,不涉及和远端机器的交互,照理说,应该是不会陷入等待的。那是否就能说阻塞非阻塞对于 send 来说不那么重要呢?也不尽然,考虑到出现系统发送缓冲区满了的情况,这时我们需要等待缓冲区发送一部分之后,腾出地方来,我们 send 的数据才能拷贝进去,在这种情况下,貌似就要依靠非阻塞才行了,非阻塞下 send 会快速的返回一个失败,好让我们采取其他的措施。
那么,接下来看这个代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <time.h>
#include <sys/epoll.h>
#include <map>
#include <string>
#define _PRINTF_(format, ...) printf("[%s] " format " [%s::%s::%d]\n", getdatetimestr(), ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__)
char* getdatetimestr() {
static char datetimestr[32] = {0};
static time_t last_update_time = 0;
struct timeval tv = {0};
gettimeofday(&tv, NULL);
time_t nowtime = tv.tv_sec;
if (nowtime != last_update_time) {
last_update_time = nowtime;
struct tm nowtm = {0};
localtime_r(&nowtime, &nowtm);
strftime(datetimestr, sizeof(datetimestr), "%Y-%m-%d %H:%M:%S", &nowtm);
}
snprintf(datetimestr+19, sizeof(datetimestr)-19, ".%06ld", tv.tv_usec);
return datetimestr;
}
int ctl_epoll_event(int epoll_fd, int ctl, int fd, int event) {
// EPOLL IN = 0x001,
// EPOLL PRI = 0x002,
// EPOLL OUT = 0x004,
// EPOLL RDNORM = 0x040,
// EPOLL RDBAND = 0x080,
// EPOLL WRNORM = 0x100,
// EPOLL WRBAND = 0x200,
// EPOLL MSG = 0x400,
// EPOLL ERR = 0x008,
// EPOLL HUP = 0x010,
// EPOLL ET = (1 << 31)
// EPOLL_CTL_ADD 1
// EPOLL_CTL_DEL 2
// EPOLL_CTL_MOD 3
//_PRINTF_("epoll_fd %d, ctl %d, fd %d, event %d", epoll_fd, ctl, fd, event);
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
return epoll_ctl(epoll_fd, ctl, fd, &ev);
}
int httpSrv2(short port) {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
_PRINTF_("%m");
return -1;
}
int reuseaddr = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr));
struct sockaddr_in server_addr = {0};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
_PRINTF_("%m");
return -1;
}
if (listen(listen_fd, 32) == -1) {
_PRINTF_("%m");
return -1;
}
_PRINTF_("listening...");
int epoll_fd = epoll_create(4096);
if (ctl_epoll_event(epoll_fd, EPOLL_CTL_ADD, listen_fd, EPOLLIN) == -1) {
_PRINTF_("%m");
return -1;
}
std::map<int, std::string> fd_buffer_map;
for (int count = 0; ; ) {
struct epoll_event epoll_events[64] = {{0}};
int epoll_events_num = epoll_wait(epoll_fd, epoll_events, sizeof(epoll_events)/sizeof(epoll_events[0]), 5*1000);
//_PRINTF_("epoll_events_num, %d", epoll_events_num);
for (int epoll_events_index = 0; epoll_events_index < epoll_events_num; epoll_events_index++) {
int event_fd = epoll_events[epoll_events_index].data.fd;
if (event_fd == listen_fd) {
if (!(epoll_events[epoll_events_index].events & EPOLLIN)) {
_PRINTF_("listen fd event: %d, msg: %m", epoll_events[epoll_events_index].events);
continue;
}
int client_fd = accept(listen_fd, (struct sockaddr*)NULL, NULL);
//int client_fd = accept4(listen_fd, (struct sockaddr*)NULL, NULL, SOCK_NONBLOCK);
if (client_fd == -1) {
_PRINTF_("%m");
continue;
}
ctl_epoll_event(epoll_fd, EPOLL_CTL_ADD, client_fd, EPOLLIN);
} else {
int client_fd = event_fd;
if (epoll_events[epoll_events_index].events & EPOLLIN) {
char recv_buff[64*1024] = {0};
int recv_len = recv(client_fd, recv_buff, sizeof(recv_buff), 0);
if (recv_len < 0) {
_PRINTF_("%m");
continue;
}
fd_buffer_map[client_fd].append(recv_buff, recv_len);
//_PRINTF_("fd %d buffer [%s]", client_fd, fd_buffer_map[client_fd].c_str());
if (fd_buffer_map[client_fd].find("\r\n\r\n") == std::string::npos) {
continue;
}
char response[] = "HTTP/1.1 200 OK\r\n\r\n<h1>Hello World</h1>";
if (send(client_fd, response, sizeof(response)-1, 0) < 0) {
_PRINTF_("%m");
}
if (count++ % 100000 == 0) {
_PRINTF_("count %d, last request [%s], last response [%s]", count, fd_buffer_map[client_fd].c_str(), response);
}
ctl_epoll_event(epoll_fd, EPOLL_CTL_DEL, client_fd, EPOLLIN);
close(client_fd);
fd_buffer_map.erase(client_fd);
} else {
_PRINTF_("client fd event: %d, msg: %m", epoll_events[epoll_events_index].events);
}
}
}
}
close(epoll_fd);
close(listen_fd);
return 0;
}
int main(int argc, char** argv) {
_PRINTF_("starting server...");
httpSrv2(atoi(argv[1]));
return 0;
}
这个代码通过 95/96 行可以切换阻塞和非阻塞模式,在两种模式中,其实服务的吞吐量差不多,但是,阻塞的代码,CPU 跑到了 100%(一个核上 100%,其他核空闲),非阻塞的代码,同样的吞吐率,CPU 只用了不到 10%,这是为什么呢?
————————
2015-7-20 19:37:42 进一步的观察发现,非阻塞的 10% CPU 使用率,只是在进程刚启动的时候,在进程运行一段时间之后,也会跑到 100% 的 CPU