一般来说,现在大家 epoll 都是搭配着非阻塞 IO 一起用的,要问为什么?大家都这么用的啊,而且异步嘛,非阻塞嘛,很自然嘛
但是,非阻塞 IO 具体是怎么对 send 和 recv 起作用的。一般理解,我们之所以要用非阻塞,是为了避免这种情况:
客户端跟我们 tcp 三次握手完了,我们 listen fd 上得到一个 IN 事件了,然后 accept,得到一个新的 client fd,然后如果这个时候,客户端没有发字节流过来,我们调用 recv,就会因为服务器的系统缓冲区中空空如也,陷入阻塞,所以如果这里我们用上非阻塞,就可以让 recv 快速的返回一个 0,从而让 CPU 转而去处理其他 client fd,所以,非阻塞对 recv 的作用效果是很明显的。
但是 send 呢,即使是在阻塞情况下,我们调用 send,系统都是把我们的数据拷贝到系统缓冲区,然后就返回了,而并不需要等到这些数据真正发出来收到 ack 之后才返回,也就是说,这里纯粹是一个内存拷贝的动作,不涉及和远端机器的交互,照理说,应该是不会陷入等待的。那是否就能说阻塞非阻塞对于 send 来说不那么重要呢?也不尽然,考虑到出现系统发送缓冲区满了的情况,这时我们需要等待缓冲区发送一部分之后,腾出地方来,我们 send 的数据才能拷贝进去,在这种情况下,貌似就要依靠非阻塞才行了,非阻塞下 send 会快速的返回一个失败,好让我们采取其他的措施。
那么,接下来看这个代码:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <sys/time.h> #include <time.h> #include <sys/epoll.h> #include <map> #include <string> #define _PRINTF_(format, ...) printf("[%s] " format " [%s::%s::%d]\n", getdatetimestr(), ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__) char* getdatetimestr() { static char datetimestr[32] = {0}; static time_t last_update_time = 0; struct timeval tv = {0}; gettimeofday(&tv, NULL); time_t nowtime = tv.tv_sec; if (nowtime != last_update_time) { last_update_time = nowtime; struct tm nowtm = {0}; localtime_r(&nowtime, &nowtm); strftime(datetimestr, sizeof(datetimestr), "%Y-%m-%d %H:%M:%S", &nowtm); } snprintf(datetimestr+19, sizeof(datetimestr)-19, ".%06ld", tv.tv_usec); return datetimestr; } int ctl_epoll_event(int epoll_fd, int ctl, int fd, int event) { // EPOLL IN = 0x001, // EPOLL PRI = 0x002, // EPOLL OUT = 0x004, // EPOLL RDNORM = 0x040, // EPOLL RDBAND = 0x080, // EPOLL WRNORM = 0x100, // EPOLL WRBAND = 0x200, // EPOLL MSG = 0x400, // EPOLL ERR = 0x008, // EPOLL HUP = 0x010, // EPOLL ET = (1 << 31) // EPOLL_CTL_ADD 1 // EPOLL_CTL_DEL 2 // EPOLL_CTL_MOD 3 //_PRINTF_("epoll_fd %d, ctl %d, fd %d, event %d", epoll_fd, ctl, fd, event); struct epoll_event ev; ev.data.fd = fd; ev.events = event; return epoll_ctl(epoll_fd, ctl, fd, &ev); } int httpSrv2(short port) { int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { _PRINTF_("%m"); return -1; } int reuseaddr = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)); struct sockaddr_in server_addr = {0}; server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { _PRINTF_("%m"); return -1; } if (listen(listen_fd, 32) == -1) { _PRINTF_("%m"); return -1; } _PRINTF_("listening..."); int epoll_fd = epoll_create(4096); if (ctl_epoll_event(epoll_fd, EPOLL_CTL_ADD, listen_fd, EPOLLIN) == -1) { _PRINTF_("%m"); return -1; } std::map<int, std::string> fd_buffer_map; for (int count = 0; ; ) { struct epoll_event epoll_events[64] = {{0}}; int epoll_events_num = epoll_wait(epoll_fd, epoll_events, sizeof(epoll_events)/sizeof(epoll_events[0]), 5*1000); //_PRINTF_("epoll_events_num, %d", epoll_events_num); for (int epoll_events_index = 0; epoll_events_index < epoll_events_num; epoll_events_index++) { int event_fd = epoll_events[epoll_events_index].data.fd; if (event_fd == listen_fd) { if (!(epoll_events[epoll_events_index].events & EPOLLIN)) { _PRINTF_("listen fd event: %d, msg: %m", epoll_events[epoll_events_index].events); continue; } int client_fd = accept(listen_fd, (struct sockaddr*)NULL, NULL); //int client_fd = accept4(listen_fd, (struct sockaddr*)NULL, NULL, SOCK_NONBLOCK); if (client_fd == -1) { _PRINTF_("%m"); continue; } ctl_epoll_event(epoll_fd, EPOLL_CTL_ADD, client_fd, EPOLLIN); } else { int client_fd = event_fd; if (epoll_events[epoll_events_index].events & EPOLLIN) { char recv_buff[64*1024] = {0}; int recv_len = recv(client_fd, recv_buff, sizeof(recv_buff), 0); if (recv_len < 0) { _PRINTF_("%m"); continue; } fd_buffer_map[client_fd].append(recv_buff, recv_len); //_PRINTF_("fd %d buffer [%s]", client_fd, fd_buffer_map[client_fd].c_str()); if (fd_buffer_map[client_fd].find("\r\n\r\n") == std::string::npos) { continue; } char response[] = "HTTP/1.1 200 OK\r\n\r\n<h1>Hello World</h1>"; if (send(client_fd, response, sizeof(response)-1, 0) < 0) { _PRINTF_("%m"); } if (count++ % 100000 == 0) { _PRINTF_("count %d, last request [%s], last response [%s]", count, fd_buffer_map[client_fd].c_str(), response); } ctl_epoll_event(epoll_fd, EPOLL_CTL_DEL, client_fd, EPOLLIN); close(client_fd); fd_buffer_map.erase(client_fd); } else { _PRINTF_("client fd event: %d, msg: %m", epoll_events[epoll_events_index].events); } } } } close(epoll_fd); close(listen_fd); return 0; } int main(int argc, char** argv) { _PRINTF_("starting server..."); httpSrv2(atoi(argv[1])); return 0; }
这个代码通过 95/96 行可以切换阻塞和非阻塞模式,在两种模式中,其实服务的吞吐量差不多,但是,阻塞的代码,CPU 跑到了 100%(一个核上 100%,其他核空闲),非阻塞的代码,同样的吞吐率,CPU 只用了不到 10%,这是为什么呢?
————————
2015-7-20 19:37:42 进一步的观察发现,非阻塞的 10% CPU 使用率,只是在进程刚启动的时候,在进程运行一段时间之后,也会跑到 100% 的 CPU