0%

io多路复用select_poll_epoll

1. 基础概念

1.1 内核态和用户态

Linux系统中分为内核态(Kernel model)和用户态(User model),CPU会在两个model之间切换。

  • 内核态代码拥有完全的底层资源控制权限,可以执行任何CPU指令,访问任何内存地址,其占有的处理机是不允许被抢占的。内核态的指令包括:启动I/O,内存清零,修改程序状态字,设置时钟,允许/终止中断和停机。内核态的程序崩溃会导致PC停机。

  • 用户态是用户程序能够使用的指令,不能直接访问底层硬件和内存地址。用户态运行的程序必须委托系统调用来访问硬件和内存。用户态的指令包括:控制转移,算数运算,取数指令,访管指令(使用户程序从用户态陷入内核态)。

1.2 用户态和内核态的切换

用户态切换到内核态有三种方式:

1. 系统调用

这是用户态进程主动要求切换到内核态的一种方式,用户态进程通过系统调用申请使用操作系统提供的服务程序完成工作,比如前例中fork()实际上就是执行了一个创建新进程的系统调用。而系统调用的机制其核心还是使用了操作系统为用户特别开放的一个中断来实现,例如Linux的int 80h中断。

2. 异常

当CPU在执行运行在用户态下的程序时,发生了某些事先不可知的异常,这时会触发由当前运行进程切换到处理此异常的内核相关程序中,也就转到了内核态,比如缺页异常。

3. 外围设备的中断

当外围设备完成用户请求的操作后,会向CPU发出相应的中断信号,这时CPU会暂停执行下一条即将要执行的指令转而去执行与中断信号对应的处理程序,如果先前执行的指令是用户态下的程序,那么这个转换的过程自然也就发生了由用户态到内核态的切换。比如硬盘读写操作完成,系统会切换到硬盘读写的中断处理程序中执行后续操作等。

2. 多服务模型

2.1 多进程模型

服务器的主进程负责监听客户的连接,一旦与客户端连接完成,accept() 函数就会返回一个「已连接 Socket」,这时就通过 fork() 函数创建一个子进程,实际上就把父进程所有相关的东西都复制一份,包括文件描述符、内存地址空间、程序计数器、执行的代码等。

这两个进程刚复制完的时候,几乎一模一样。不过,会根据返回值来区分是父进程还是子进程,如果返回值是 0,则是子进程;如果返回值是其他的整数,就是父进程。

正因为子进程会复制父进程的文件描述符,于是就可以直接使用「已连接 Socket 」和客户端通信了,

img

可以发现,子进程不需要关心「监听 Socket」,只需要关心「已连接 Socket」;父进程则相反,将客户服务交给子进程来处理,因此父进程不需要关心「已连接 Socket」,只需要关心「监听 Socket」。

2.2 多线程模型

当服务器与客户端 TCP 完成连接后,通过 pthread_create() 函数创建线程,然后将「已连接 Socket」的文件描述符传递给线程函数,接着在线程里和客户端进行通信,从而达到并发处理的目的。

如果每来一个连接就创建一个线程,线程运行完后,还得操作系统还得销毁线程,虽说线程切换的上写文开销不大,但是如果频繁创建和销毁线程,系统开销也是不小的。

那么,我们可以使用线程池的方式来避免线程的频繁创建和销毁,所谓的线程池,就是提前创建若干个线程,这样当由新连接建立时,将这个已连接的 Socket 放入到一个队列里,然后线程池里的线程负责从队列中取出「已连接 Socket 」进行处理。

img

上面基于进程或者线程模型的,其实还是有问题的。新到来一个 TCP 连接,就需要分配一个进程或者线程,那么如果要达到 C10K(同时处理 10000 个并发连接的能力),意味着要一台机器维护 1 万个连接,相当于要维护 1 万个进程/线程,操作系统就算死扛也是扛不住的。

2. IO多路复用

为每个客户端创建一个线程,服务器端的线程资源很容易被耗光。当然还有个聪明的办法,我们可以每 accept 一个客户端连接后,将这个文件描述符(connfd)放到一个数组里。然后弄一个新的线程去不断遍历这个数组,调用每一个元素的非阻塞 read 方法。

1
2
3
4
5
6
7
8
9
10
accept  ->  fdlist.add(connfd);


while(1) {
for(fd <-- fdlist) {
if(read(fd) != -1) {
doSomeThing();
}
}
}

但这和我们用多线程去将阻塞 IO 改造成看起来是非阻塞 IO 一样,这种遍历方式也只是我们用户自己想出的小把戏,每次遍历遇到 read 返回 -1 时仍然是一次浪费资源的系统调用。在 while 循环里做系统调用,就好比你做分布式项目时在 while 里做 rpc 请求一样,是不划算的。

所以,还是得恳请操作系统老大,提供给我们一个有这样效果的函数,我们将一批文件描述符通过一次系统调用传给内核,由内核层去遍历,才能真正解决这个问题。

2.1 select

select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。

所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。

1
2
3
4
5
6
int select(
int nfds, // nfds:监控的文件描述符集里最大文件描述符加1
fd_set *readfds, // readfds:监控有读数据到达文件描述符集合,传入传出参数
fd_set *writefds, // writefds:监控写数据到达文件描述符集合,传入传出参数
fd_set *exceptfds, // exceptfds:监控异常发生达文件描述符集合, 传入传出参数
struct timeval *timeout); // timeout:定时阻塞监控时间,3种情况 1.NULL,永远等下去 2.设置timeval,等待固定时间 3.设置timeval里时间均为0,检查描述字后立即返回,轮询

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有异常),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

首先一个线程不断接受客户端连接,并把 socket 文件描述符放到一个 list 里。

1
2
3
4
5
while(1) {
connfd = accept(listenfd);
fcntl(connfd, F_SETFL, O_NONBLOCK);
fdlist.add(connfd);
}

然后,另一个线程不再自己遍历,而是调用 select,将这批文件描述符 list 交给操作系统去遍历。

1
2
3
4
5
6
while(1) {
// 把一堆文件描述符 list 传给 select 函数
// 有已就绪的文件描述符就返回,nready 表示有多少个就绪的
nready = select(list);
...
}

只不过,操作系统会将准备就绪的文件描述符做上标识,用户层将不会再有无意义的系统调用开销。

1
2
3
4
5
6
7
8
9
10
11
12
while(1) {
nready = select(list);
// 用户层依然要遍历,只不过少了很多无效的系统调用
for(fd <-- fdlist) {
if(fd != -1) {
// 只读已就绪的文件描述符
read(fd, buf);
// 总共只有 nready 个已就绪描述符,不用过多遍历
if(--nready == 0) break;
}
}
}

img

可以看出几个细节:

  1. select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
  2. select 在内核层仍然是通过遍历的方式检查文件描述符的就绪状态,是个同步过程,只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)
  3. select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)
  4. 另外select 还有1024的限制。
img

可以看到,这种方式,既做到了一个线程处理多个客户端连接(文件描述符),又减少了系统调用的开销(多个文件描述符只有一次 select 的系统调用 + n 次就绪状态的文件描述符的 read 系统调用)。

2.2 poll

只解除了select 数量1024的限制

1
2
3
4
5
6
7
8
9
int poll(struct pollfd *fds, nfds_tnfds, int timeout);



struct pollfd {
intfd; /*文件描述符*/
shortevents; /*监控的事件*/
shortrevents; /*监控事件中满足条件返回的事件*/
};

poll和select非常相似,poll并没着手解决性能问题,poll只是解决了select的问题 fds集合大小1024限制问题。所以是个鸡肋。

2.3 epoll

一次系统调用 + 内核层遍历这些文件描述符

还记得上面说的 select 的三个细节么?

  1. select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
  2. select 在内核层仍然是通过遍历的方式检查文件描述符的就绪状态,是个同步过程,只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)
  3. select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)

所以 epoll 主要就是针对这三点进行了改进。

  1. 内核中保存一份文件描述符集合,无需用户每次都重新传入,只需告诉内核修改的部分即可。
  2. 内核不再通过轮询的方式找到就绪的文件描述符,而是通过异步 IO 事件唤醒。
  3. 内核仅会将有 IO 事件的文件描述符返回给用户,用户也无需遍历整个文件描述符集合。

具体,操作系统提供了这三个函数。

img
第一步,创建一个 epoll 句柄
1
int epoll_create(int size);

size用来告诉内核这个监听的数目一共有多大。新版本用红黑树,这个参数意义不大了。

第二步,向内核添加、修改或删除要监控的文件描述符。
1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfd:是epoll_create()的返回值。

  • op:表示op操作,分别添加、删除和修改对fd的监听事件。

    • 添加EPOLL_CTL_ADD,
    • 删除EPOLL_CTL_DEL,
    • 修改EPOLL_CTL_MOD。
  • fd:是需要监听的fd(文件描述符)

  • epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

    1
    2
    3
    4
    struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };

    events可以是以下几个宏的集合:

    • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    • EPOLLOUT:表示对应的文件描述符可以写;
    • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    • EPOLLERR:表示对应的文件描述符发生错误;
    • EPOLLHUP:表示对应的文件描述符被挂断;
    • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
第三步,epoll_wait 调用
1
int epoll_wait(int epfd, struct epoll_event *events, int max events, int timeout);

img

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size

参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。

该函数返回需要处理的事件数目,如返回0表示已超时。

3. 为什么有 epoll

3.1 io 的演变

一切的开始,都起源于这个 read 函数是操作系统提供的,而且是阻塞的,我们叫它 阻塞 IO

为了破这个局,程序员在用户态通过多线程来防止主线程卡死。

后来操作系统发现这个需求比较大,于是在操作系统层面提供了非阻塞的 read 函数,这样程序员就可以在一个线程内完成多个文件描述符的读取,这就是 非阻塞 IO

但多个文件描述符的读取就需要遍历,当高并发场景越来越多时,用户态遍历的文件描述符也越来越多,相当于在 while 循环里进行了越来越多的系统调用。

后来操作系统又发现这个场景需求量较大,于是又在操作系统层面提供了这样的遍历文件描述符的机制,这就是 IO 多路复用

多路复用有三个函数,最开始是 select,然后又发明了 poll 解决了 select 文件描述符的限制,然后又发明了 epoll 解决 select 的三个不足。

3.2 epoll的意义

所以,IO 模型的演进,其实就是时代的变化,倒逼着操作系统将更多的功能加到自己的内核而已。如果你建立了这样的思维,很容易发现网上的一些错误。

比如好多文章说,多路复用之所以效率高,是因为用一个线程就可以监控多个文件描述符。

这显然是知其然而不知其所以然,多路复用产生的效果,完全可以由用户态去遍历文件描述符并调用其非阻塞的 read 函数实现。而多路复用快的原因在于,操作系统提供了这样的系统调用,使得原来的 while 循环里多次系统调用,变成了一次系统调用 + 内核层遍历这些文件描述符。

就好比我们平时写业务代码,把原来 while 循环里调 http 接口进行批量,改成了让对方提供一个批量添加的 http 接口,然后我们一次 rpc 请求就完成了批量添加一个道理。

3.3 epoll使用了mmap了吗

不少博客中提到,epoll_wait返回时,对于就绪的事件,epoll使用的是共享内存的方式,即用户态和内核态都指向了就绪链表,所以就避免了内存拷贝消耗。

这是错的!看过 epoll 内核源码的都知道,压根就没有使用共享内存这个玩意。你可以从下面这份代码看到, epoll_wait 实现的内核代码中调用了 __put_user 函数,这个函数就是将数据从内核拷贝到用户空间。

img

3.4 epoll 边缘触发(ET)和水平触发(LT)

epoll 支持两种事件触发模式,分别是边缘触发(edge-triggered,ET)和水平触发(level-triggered,LT)。epoll 默认的触发模式是水平触发。

使用边缘触发模式时,当被监控的 Socket 描述符上有可读事件发生时,服务器端只会从 epoll_wait 中苏醒一次,即使进程没有调用 read 函数从内核读取数据,也依然只苏醒一次,因此我们程序要保证一次性将内核缓冲区的数据读取完;

使用水平触发模式时,当被监控的 Socket 上有可读事件发生时,服务器端不断地从 epoll_wait 中苏醒,直到内核缓冲区数据被 read 函数读完才结束,目的是告诉我们有数据需要读取;

水平触发的意思是只要满足事件的条件,比如内核中有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。

如果使用边缘触发模式,I/O 事件发生时只会通知一次,而且我们不知道到底能读写多少数据,所以在收到通知后应尽可能地读写数据,以免错失读写的机会。因此,我们会循环从文件描述符读写数据,那么如果文件描述符是阻塞的,没有数据可读写时,进程会阻塞在读写函数那里,程序就没办法继续往下执行。所以,边缘触发模式一般和非阻塞 I/O 搭配使用,程序会一直执行 I/O 操作,直到系统调用(如 read 和 write)返回错误,错误类型为 EAGAIN 或 EWOULDBLOCK。

一般来说,边缘触发的效率比水平触发的效率要高,因为边缘触发可以减少 epoll_wait 的系统调用次数,系统调用也是有一定的开销的的,毕竟也存在上下文的切换。

select/poll 只有水平触发模式,epoll 默认的触发模式是水平触发,但是可以根据应用场景设置为边缘触发模式。

4. 代码

4.1 select

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/select.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <ctype.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"

#define PORT 8000
#define MAXLINE 1024
int main()
{
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int server_id = Socket(PF_INET, SOCK_STREAM, 0);

struct sockaddr_in server, client;
bzero(&server, sizeof(server));
server.sin_family = PF_INET;
server.sin_port = htons(PORT);
server.sin_addr.s_addr = htonl(INADDR_ANY);

int opt = 1;
setsockopt(server_id, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

Bind(server_id, (struct sockaddr*)&server, sizeof(server));
Listen(server_id, 20);
printf("Accept connections...\n");

int clients[FD_SETSIZE];
for (int i = 0; i < FD_SETSIZE; ++i) {
clients[i] = -1;
}
fd_set rset, allset;
FD_ZERO(&allset);
FD_SET(server_id, &allset);
int maxfd = server_id;
int maxi = -1;

while (1) {
rset = allset;
// 只监听读描述符
int iready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (iready < 0) {
perr_exit("select error");
}

if (FD_ISSET(server_id, &rset)) {
// 说明有新的 client 写
socklen_t len = sizeof(client);
int client_id = Accept(server_id, (struct sockaddr*)&client, &len);
printf("received from %s at PORT %d\n",
inet_ntop(PF_INET, &client.sin_addr, str, sizeof(str)),
ntohs(client.sin_port));

int i = 0;
for (; i < FD_SETSIZE; ++i) {
if (clients[i] < 0) {
clients[i] = client_id;
break;
}
}
if (i == FD_SETSIZE) {
fputs("too many clients\n", stderr);
exit(1);
}
FD_SET(client_id, &allset);
if (client_id > maxfd) {
maxfd = client_id;
}
if (i > maxi) {
maxi = i;
}
if (--iready == 0) {
continue;
}
}

for (int i = 0; i <= maxi; ++i) {
int fd = clients[i];
if (fd < 0) {
continue;
}
if (FD_ISSET(fd, &rset)) {
int n = Read(fd, buf, sizeof(buf));
if (n == 0) {
Close(fd);
FD_CLR(fd, &allset);
clients[i] = -1;
} else {
for (int i = 0; i < n; ++i) {
buf[i] = toupper(buf[i]);
}
Write(fd, buf, n);
}
if (--iready == 0) {
break;
}
}
}
}

return 0;
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <ctype.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"


#define PORT 8000
#define MAXLINE 1024
int main(int argc, char* agrv[])
{
char buf[MAXLINE];
memset(buf, 0, sizeof(buf));
int server_id = Socket(PF_INET, SOCK_STREAM, 0);

struct sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_family = PF_INET;
server.sin_port = htons(PORT);
inet_pton(PF_INET, "127.0.0.1", &server.sin_addr);

Connect(server_id, (struct sockaddr*)&server, sizeof(server));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(server_id, buf, strlen(buf));
int n = Read(server_id, buf, MAXLINE);
if (n == 0) {
printf("the other side has been closed.\n");
} else {
Write(STDOUT_FILENO, buf, n);
}
}
Close(server_id);
return 0;
}

4.2 poll

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/select.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <ctype.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include "wrap.h"

#define PORT 8000
#define MAXLINE 1024
#define OPEN_MAX 1000
int main()
{
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int server_id = Socket(PF_INET, SOCK_STREAM, 0);

struct sockaddr_in server, client;
bzero(&server, sizeof(server));
server.sin_family = PF_INET;
server.sin_port = htons(PORT);
server.sin_addr.s_addr = htonl(INADDR_ANY);

int opt = 1;
setsockopt(server_id, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

Bind(server_id, (struct sockaddr*)&server, sizeof(server));
Listen(server_id, 20);
printf("Accept connections...\n");


struct pollfd clients[OPEN_MAX];
clients[0].fd = server_id;
clients[0].events = POLLIN;
for (int i = 1; i < OPEN_MAX; i++) {
clients[i].fd = -1;
}

int maxi = 0;
while (1) {
// 监听 POLLIN 事件
int iready = poll(clients, maxi+1, -1);
if (iready < 0) {
perr_exit("poll error");
}

// 说明 client 来了写
if (clients[0].revents & POLLIN) {
socklen_t len = sizeof(client);
int client_id = Accept(server_id, (struct sockaddr*)&client, &len);
printf("received from %s at PORT %d\n",
inet_ntop(PF_INET, &client.sin_addr, str, sizeof(str)),
ntohs(client.sin_port));

int i = 1;
for (; i < OPEN_MAX; ++i) {
if (clients[i].fd < 0) {
clients[i].fd = client_id;
break;
}
}

if (i == OPEN_MAX) {
fputs("too many clients\n", stderr);
exit(1);
}

clients[i].events = POLLIN;
if (i > maxi) {
maxi = i;
}
if (--iready == 0) {
continue;
}
}

for (int i = 1; i <= maxi; ++i) {
if (clients[i].fd < 0) {
continue;
}

if (clients[i].revents & POLLIN) {
int n = Read(clients[i].fd, buf, sizeof(buf));
if (n == 0) {
Close(clients[i].fd);
clients[i].fd = -1;
} else {
for (int i = 0; i < n; ++i) {
buf[i] = toupper(buf[i]);
}
Write(clients[i].fd, buf, n);
}
if (--iready == 0) {
break;
}
}
}
}
return 0;
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <ctype.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"


#define PORT 8000
#define MAXLINE 1024
int main(int argc, char* agrv[])
{
char buf[MAXLINE];
memset(buf, 0, sizeof(buf));
int server_id = Socket(PF_INET, SOCK_STREAM, 0);

struct sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_family = PF_INET;
server.sin_port = htons(PORT);
inet_pton(PF_INET, "127.0.0.1", &server.sin_addr);

Connect(server_id, (struct sockaddr*)&server, sizeof(server));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(server_id, buf, strlen(buf));
int n = Read(server_id, buf, MAXLINE);
if (n == 0) {
printf("the other side has been closed.\n");
} else {
Write(STDOUT_FILENO, buf, n);
}
}
Close(server_id);
return 0;
}

4.3 epoll

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <ctype.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include "wrap.h"

#define PORT 8000
#define MAXLINE 1024
#define OPEN_MAX 1000

void add_event(int epollid, int fd, int state)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollid, EPOLL_CTL_ADD, fd, &ev);
}
void modify_event(int epollid, int fd, int state)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollid, EPOLL_CTL_MOD, fd, &ev);
}
void delete_event(int epollid, int fd, int state)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollid, EPOLL_CTL_DEL, fd, &ev);
}


int main()
{
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int server_id = Socket(PF_INET, SOCK_STREAM, 0);

struct sockaddr_in server, client;
bzero(&server, sizeof(server));
server.sin_family = PF_INET;
server.sin_port = htons(PORT);
server.sin_addr.s_addr = htonl(INADDR_ANY);

int opt = 1;
setsockopt(server_id, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

Bind(server_id, (struct sockaddr*)&server, sizeof(server));
Listen(server_id, 20);
printf("Accept connections...\n");


struct epoll_event events[EPOLLEVENTS];
int epollfd = epoll_create(FDSIZE);

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = STDIN_FILENO;
epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);

while (1) {
int ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
for (int i = 0; i < ret; ++i) {
int fd = events[i].data.fd;
if (fd == server_id && (events[i].events & EPOLLIN)) {
socklen_t len = sizeof(client);
int client_id = Accept(server_id, (struct sockaddr*)&client, &len);
printf("received from %s at PORT %d\n",
inet_ntop(PF_INET, &client.sin_addr, str, sizeof(str)),
ntohs(client.sin_port));

struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);

} else if (events[i].events & EPOLLIN) {
int n = Read(clients[i].fd, buf, sizeof(buf));
if (n == 0) {
Close(clients[i].fd);

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);

} else {
struct epoll_event ev;
ev.events = EPOLLOUT;//由读改为写
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

} else if (events[i].events & EPOLLOUT) {
for (int i = 0; i < n; ++i) {
buf[i] = toupper(buf[i]);
}
int n = Write(fd, buf, n);
if (n < 0) {
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);

} else {
struct epoll_event ev;
ev.events = EPOLLIN;//由写改为读
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
}
}
}

Close(epollfd);


return 0;
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "wrap.h"
#include "epollUtil.h"

#define IP "127.0.0.1"
#define PORT 8000
#define FD_SIZE 1024
#define EPOLLEVENTS 20
int main(int agrc, char* argv[]) {
char buf[1024];
memset(buf, 0, sizeof(buf));

struct sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &server.sin_addr);


int server_id = Socket(AF_INET, SOCK_STREAM, 0);
Connect(server_id, (struct sockaddr*)&server, sizeof(server));

struct epoll_event events[EPOLLEVENTS];
int epollfd = epoll_create(FD_SIZE);
add_event(epollfd, STDIN_FILENO, EPOLLIN);
while (1) {

int ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
for (int i = 0; i < ret; ++i) {
int fd = events[i].data.fd;

if (events[i].events & EPOLLIN) {
int n = Read(fd, buf, sizeof(buf));
if (n == 0) {
Close(fd);
} else {
if (fd == STDIN_FILENO) {
add_event(epollfd, server_id, EPOLLOUT);
} else {
delete_event(epollfd, server_id, EPOLLIN);
add_event(epollfd, STDOUT_FILENO, EPOLLOUT);
}
}
} else if (events[i].events & EPOLLOUT) {
Write(fd, buf, strlen(buf));
if (fd == STDOUT_FILENO) {
delete_event(epollfd, fd, EPOLLOUT);
} else {
modify_event(epollfd, fd, EPOLLIN);
}
}
}
}

Close(server_id);
return 0;
}

4.4 总结

  • select

    死循环里用 select 阻塞, 返回后开始遍历

  • poll

    死循环里用 poll 阻塞, 返回后开始遍历

  • epoll

    死循环里用 epoll_wait 阻塞

5. 头脑风暴

5.1 【多路复用】VS 【多线程+ 阻塞IO】

也许有朋友会说,我可以采用 多线程+ 阻塞IO 达到类似的效果,但是由于在多线程 + 阻塞IO 中,每个socket对应一个线程,这样会造成很大的资源占用,并且尤其是对于长连接来说,线程的资源一直不会释放,如果后面陆续有很多连接的话,就会造成性能上的瓶颈。

而多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。

另外多路复用IO为何比非阻塞IO模型的效率高是因为在非阻塞IO中,不断地询问socket状态时通过用户线程去进行的,而在多路复用IO中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。

5.2 我在知乎的回答

https://www.zhihu.com/question/32163005/answer/300165049

IO模式一般分为同步IO和异步IO. 同步IO会阻塞进程, 异步IO不会阻塞进程. 目前linux上大部分用的是同步IO, 异步IO在linux上目前还不成熟, 不过windows的iocp算是真正的异步IO。

同步IO又分为阻塞IO, 非阻塞IO, IO多路复用. What? 同步IO明明会阻塞进程,为什么也包括非阻塞IO? 因为非阻塞IO虽然在请求数据时不阻塞, 但真正数据来临时,也就是内核数据拷贝到用户数据时, 此时进程是阻塞的.

那么这些IO模式的区别分别是什么? 接下来举个小例子来说明. 假设你现在去女生宿舍楼找自己的女神, 但是你只知道女神的手机号,并不知道女神的具体房间

先说同步IO的情况,

  1. 阻塞IO, 给女神发一条短信, 说我来找你了, 然后就默默的一直等着女神下楼, 这个期间除了等待你不会做其他事情, 属于备胎做法.
  1. 非阻塞IO, 给女神发短信, 如果不回, 接着再发, 一直发到女神下楼, 这个期间你可以在两次发短信间隙喝口水,属于专一做法.
  1. IO多路复用, 是找一个宿管大妈来帮你监视下楼的女生, 这个期间你可以些其他的事情. 例如可以顺便看看其他妹子,玩玩王者荣耀, 上个厕所等等. IO复用又包括 select, poll, epoll 模式. 那么它们的区别是什么?

3.1 select大妈 每一个女生下楼, select大妈都不知道这个是不是你的女神, 她需要一个一个询问, 并且select大妈能力还有限, 最多一次帮你监视1024个妹子

3.2 poll大妈不限制盯着女生的数量, 只要是经过宿舍楼门口的女生, 都会帮你去问是不是你女神

3.3 epoll大妈不限制盯着女生的数量, 并且也不需要一个一个去问. 那么如何做呢? epoll大妈会为每个进宿舍楼的女生脸上贴上一个大字条,上面写上女生自己的名字, 只要女生下楼了, epoll大妈就知道这个是不是你女神了, 然后大妈再通知你.

上面这些同步IO有一个共同点就是, 当女神走出宿舍门口的时候, 你已经站在宿舍门口等着女神的, 此时你属于阻塞状态

接下来是异步IO的情况

你告诉女神我来了, 然后你就去王者荣耀了, 一直到女神下楼了, 发现找不见你了, 女神再给你打电话通知你, 说我下楼了, 你在哪呢? 这时候你才来到宿舍门口. 此时属于逆袭做法

6. 参考资料

给作者打赏,可以加首页微信,咨询作者相关问题!