【Linux C编程】第十八章 高并发服务器(三)
epoll工作原理:
通过下面的伪代码有助于上面的理解:
epoll伪代码
1 int main()
2 {
3 // 创建监听的套接字
4 int lfd = socket();
5 // 绑定
6 bind();
7 // 监听
8 listen();
9
10 // epoll树根节点
11 int epfd = epoll_create(3000);
12 // 存储发送变化的fd对应信息
13 struct epoll_event all[3000];
14 // init
15 // 监听的lfd挂到epoll树上
16 struct epoll_event ev;
17 // 在ev中init lfd信息
18 ev.events = EPOLLIN ;
19 ev.data.fd = lfd;
20 epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
21 while(1)
22 {
23 // 委托内核检测事件
24 int ret = epoll_wait(epfd, all, 3000, -1);
25 // 根据ret遍历all数组
26 for(int i=0; i<ret; ++i)
27 {
28 int fd = all[i].data.fd;
29 // 有新的连接
30 if( fd == lfd)
31 {
32 // 接收连接请求 - accept不阻塞
33 int cfd = accept();
34 // cfd上树
35 ev.events = EPOLLIN;
36 ev.data.fd = cfd;
37 epoll_ctl(epfd, epoll_ctl_add, cfd, &ev);
38 }
39 // 已经连接的客户端有数据发送过来
40 else
41 {
42 // 只处理客户端发来的数据
43 if(!all[i].events & EPOLLIN)
44 {
45 continue;
46 }
47 // 读数据
48 int len = recv();
49 if(len == 0)
50 {
51 close(fd);
52 // 检测的fd从树上删除
53 epoll_ctl(epfd, epoll_ctl_del, fd, NULL);
54 }
55 // 写数据
56 send();
57 }
58 }
59 }
60 }
示例:
epoll.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9 #include <sys/epoll.h>
10
11
12 int main(int argc, const char* argv[])
13 {
14 if(argc < 2)
15 {
16 printf("eg: ./a.out port\n");
17 exit(1);
18 }
19 struct sockaddr_in serv_addr;
20 socklen_t serv_len = sizeof(serv_addr);
21 int port = atoi(argv[1]);
22
23 // 创建套接字
24 int lfd = socket(AF_INET, SOCK_STREAM, 0);
25 // 初始化服务器 sockaddr_in
26 memset(&serv_addr, 0, serv_len);
27 serv_addr.sin_family = AF_INET; // 地址族
28 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
29 serv_addr.sin_port = htons(port); // 设置端口
30 // 绑定IP和端口
31 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
32
33 // 设置同时监听的最大个数
34 listen(lfd, 36);
35 printf("Start accept ......\n");
36
37 struct sockaddr_in client_addr;
38 socklen_t cli_len = sizeof(client_addr);
39
40 // 创建epoll树根节点
41 int epfd = epoll_create(2000);
42 // 初始化epoll树
43 struct epoll_event ev;
44 ev.events = EPOLLIN;
45 ev.data.fd = lfd;
46 epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
47
48 struct epoll_event all[2000];
49 while(1)
50 {
51 // 使用epoll通知内核fd 文件IO检测 sizeof(all)/sizeof(all[0]) --> sizeof(struct epoll_event)
52 int ret = epoll_wait(epfd, all, sizeof(all)/sizeof(all[0]), -1);
53
54 // 遍历all数组中的前ret个元素
55 for(int i=0; i<ret; ++i)
56 {
57 int fd = all[i].data.fd;
58 // 判断是否有新连接
59 if(fd == lfd)
60 {
61 // 接受连接请求
62 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
63 if(cfd == -1)
64 {
65 perror("accept error");
66 exit(1);
67 }
68 // 将新得到的cfd挂到树上
69 struct epoll_event temp;
70 temp.events = EPOLLIN;
71 temp.data.fd = cfd;
72 epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &temp);
73
74 // 打印客户端信息
75 char ip[64] = {0};
76 printf("New Client IP: %s, Port: %d\n",
77 inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
78 ntohs(client_addr.sin_port));
79
80 }
81 else
82 {
83 // 处理已经连接的客户端发送过来的数据
84 if(!all[i].events & EPOLLIN)
85 {
86 continue;
87 }
88
89 // 读数据
90 char buf[1024] = {0};
91 int len = recv(fd, buf, sizeof(buf), 0);
92 if(len == -1)
93 {
94 perror("recv error");
95 exit(1);
96 }
97 else if(len == 0)
98 {
99 printf("client disconnected ....\n");
100 // fd从epoll树上删除
101 ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
102 if(ret == -1)
103 {
104 perror("epoll_ctl - del error");
105 exit(1);
106 }
107 close(fd);
108 }
109 else
110 {
111 printf(" recv buf: %s\n", buf);
112 write(fd, buf, len);
113 }
114 }
115 }
116 }
117
118 close(lfd);
119 return 0;
120 }
client.c
1 /* client.c */
2 #include <stdio.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <netinet/in.h>
6 #include "wrap.h"
7
8 #define MAXLINE 80
9 #define SERV_PORT 6666
10
11 int main(int argc, char *argv[])
12 {
13 struct sockaddr_in servaddr;
14 char buf[MAXLINE];
15 int sockfd, n;
16
17 sockfd = Socket(AF_INET, SOCK_STREAM, 0);
18
19 bzero(&servaddr, sizeof(servaddr));
20 servaddr.sin_family = AF_INET;
21 inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
22 servaddr.sin_port = htons(SERV_PORT);
23
24 Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
25
26 while (fgets(buf, MAXLINE, stdin) != NULL) {
27 Write(sockfd, buf, strlen(buf));
28 n = Read(sockfd, buf, MAXLINE);
29 if (n == 0)
30 printf("the other side has been closed.\n");
31 else
32 Write(STDOUT_FILENO, buf, n);
33 }
34
35 Close(sockfd);
36 return 0;
37 }
注意:epoll_wait 调用次数越多, 系统的开销越大
四、epoll进阶
1. 事件模型
EPOLL事件有两种模型:
Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
Level Triggered (LT) 水平触发只要有数据都会触发。
思考如下步骤:
1)假定我们已经把一个用来从管道中读取数据的文件描述符(RFD)添加到epoll描述符。
2)管道的另一端写入了2KB的数据
3)调用epoll_wait,并且它会返回RFD,说明它已经准备好读取操作
4)读取1KB的数据
5)调用epoll_wait……
在这个过程中,有两种工作模式:
(1)ET模式
ET模式即Edge Triggered工作模式。
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在
等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩
余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面
会介绍避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的
读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
(2)LT模式
LT模式即Level Triggered工作模式。
与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任
何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再
为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。
2. 实例一
基于管道epoll ET触发模式
et_epoll.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <sys/epoll.h>
4 #include <errno.h>
5 #include <unistd.h>
6
7 #define MAXLINE 10
8
9 int main(int argc, char *argv[])
10 {
11 int efd, i;
12 int pfd[2];
13 pid_t pid;
14 char buf[MAXLINE], ch = 'a';
15
16 pipe(pfd);
17 pid = fork();
18 if (pid == 0) {
19 close(pfd[0]);
20 while (1) {
21 for (i = 0; i < MAXLINE/2; i++)
22 buf[i] = ch;
23 buf[i-1] = '\n';
24 ch++;
25
26 for (; i < MAXLINE; i++)
27 buf[i] = ch;
28 buf[i-1] = '\n';
29 ch++;
30
31 write(pfd[1], buf, sizeof(buf));
32 sleep(2);
33 }
34 close(pfd[1]);
35 } else if (pid > 0) {
36 struct epoll_event event;
37 struct epoll_event resevent[10];
38 int res, len;
39 close(pfd[1]);
40
41 efd = epoll_create(10);
42 /* event.events = EPOLLIN; */
43 event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
44 event.data.fd = pfd[0];
45 epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);
46
47 while (1) {
48 res = epoll_wait(efd, resevent, 10, -1);
49 printf("res %d\n", res);
50 if (resevent[0].data.fd == pfd[0]) {
51 len = read(pfd[0], buf, MAXLINE/2);
52 write(STDOUT_FILENO, buf, len);
53 }
54 }
55 close(pfd[0]);
56 close(efd);
57 } else {
58 perror("fork");
59 exit(-1);
60 }
61 return 0;
62 }
3. 实例二
基于网络C/S模型的epoll ET触发模式
et_epoll.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9 #include <sys/epoll.h>
10
11
12 int main(int argc, const char* argv[])
13 {
14 if(argc < 2)
15 {
16 printf("eg: ./a.out port\n");
17 exit(1);
18 }
19 struct sockaddr_in serv_addr;
20 socklen_t serv_len = sizeof(serv_addr);
21 int port = atoi(argv[1]);
22
23 // 创建套接字
24 int lfd = socket(AF_INET, SOCK_STREAM, 0);
25 // 初始化服务器 sockaddr_in
26 memset(&serv_addr, 0, serv_len);
27 serv_addr.sin_family = AF_INET; // 地址族
28 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
29 serv_addr.sin_port = htons(port); // 设置端口
30 // 绑定IP和端口
31 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
32
33 // 设置同时监听的最大个数
34 listen(lfd, 36);
35 printf("Start accept ......\n");
36
37 struct sockaddr_in client_addr;
38 socklen_t cli_len = sizeof(client_addr);
39
40 // 创建epoll树根节点
41 int epfd = epoll_create(2000);
42 // 初始化epoll树
43 struct epoll_event ev;
44
45 // 设置边沿触发
46 ev.events = EPOLLIN | EPOLLET;
47 ev.data.fd = lfd;
48 epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
49
50 struct epoll_event all[2000];
51 while(1)
52 {
53 // 使用epoll通知内核fd 文件IO检测
54 int ret = epoll_wait(epfd, all, sizeof(all)/sizeof(all[0]), -1);
55 printf("================== epoll_wait =============\n");
56
57 // 遍历all数组中的前ret个元素
58 for(int i=0; i<ret; ++i)
59 {
60 int fd = all[i].data.fd;
61 // 判断是否有新连接
62 if(fd == lfd)
63 {
64 // 接受连接请求
65 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
66 if(cfd == -1)
67 {
68 perror("accept error");
69 exit(1);
70 }
71 // 将新得到的cfd挂到树上
72 struct epoll_event temp;
73 // 设置边沿触发
74 temp.events = EPOLLIN | EPOLLET;
75 temp.data.fd = cfd;
76 epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &temp);
77
78 // 打印客户端信息
79 char ip[64] = {0};
80 printf("New Client IP: %s, Port: %d\n",
81 inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
82 ntohs(client_addr.sin_port));
83
84 }
85 else
86 {
87 // 处理已经连接的客户端发送过来的数据
88 if(!all[i].events & EPOLLIN)
89 {
90 continue;
91 }
92
93 // 读数据
94 char buf[5] = {0};
95 int len = recv(fd, buf, sizeof(buf), 0);
96 if(len == -1)
97 {
98 perror("recv error");
99 exit(1);
100 }
101 else if(len == 0)
102 {
103 printf("client disconnected ....\n");
104 // fd从epoll树上删除
105 ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
106 if(ret == -1)
107 {
108 perror("epoll_ctl - del error");
109 exit(1);
110 }
111 close(fd);
112 }
113 else
114 {
115 // printf(" recv buf: %s\n", buf);
116 write(STDOUT_FILENO, buf, len);
117 write(fd, buf, len);
118 }
119 }
120 }
121 }
122
123 close(lfd);
124 return 0;
125 }
tcp_client.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <string.h>
7 #include <arpa/inet.h>
8 #include <fcntl.h>
9
10 // tcp client
11 int main(int argc, const char* argv[])
12 {
13 if(argc < 2)
14 {
15 printf("eg: ./a.out port\n");
16 exit(1);
17 }
18 // 创建套接字
19 int fd = socket(AF_INET, SOCK_STREAM, 0);
20 if(fd == -1)
21 {
22 perror("socket error");
23 exit(1);
24 }
25 int port = atoi(argv[1]);
26 // 连接服务器
27 struct sockaddr_in serv_addr;
28 memset(&serv_addr, 0, sizeof(serv_addr));
29 serv_addr.sin_family = AF_INET;
30 serv_addr.sin_port = htons(port);
31 inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);
32 int ret = connect(fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
33 if(ret == -1)
34 {
35 perror("connect error");
36 exit(1);
37 }
38
39 // 通信
40 while(1)
41 {
42 // 写数据
43 // 接收键盘输入
44 char buf[512];
45 fgets(buf, sizeof(buf), stdin);
46 // 发送给服务器
47 write(fd, buf, strlen(buf)+1);
48
49 // 接收服务器端的数据
50 int len = read(fd, buf, sizeof(buf));
51 printf("read buf = %s, len = %d\n", buf, len);
52 }
53 return 0;
54 }
执行结果:
client端:
[root@centos epoll]# ./client 6666
000001111122222
read buf = 000001111122222
, len = 5
read buf = 11111, len = 5
read buf = 22222, len = 5
server端:
[root@centos epoll]# ./et_epoll 6666
Start accept ......
================== epoll_wait =============
New Client IP: 127.0.0.1, Port: 54080
================== epoll_wait =============
00000================== epoll_wait =============
11111================== epoll_wait =============
22222
执行结果分析:可以看出,当客户端发送数据(000001111122222)到server端(接收数据缓冲区内),但是由于server端一次只接受5个字节(00000),因此在接受完5个字节之后,将接收的5个字节数据发回给客户端,程序又会在epoll_wait处阻塞等待。当有新数据再次发送过来,则会将上一次缓冲区中剩余的数据(11111)读取并发送给客户端,如此最后将(22222)发送给客户端。
- 点赞
- 收藏
- 关注作者
评论(0)