【Linux C编程】第十八章 高并发服务器(三)

举报
Yuchuan 发表于 2021/05/12 17:09:15 2021/05/12
【摘要】 高并发服务器

 epoll工作原理:

通过下面的伪代码有助于上面的理解:

epoll伪代码

1 int main()
 2 {
 3     // 创建监听的套接字
 4     int lfd = socket();
 5     // 绑定
 6     bind();
 7     // 监听
 8     listen();
 9     
10     // epoll树根节点
11     int epfd = epoll_create(3000);
12     // 存储发送变化的fd对应信息
13     struct epoll_event all[3000];
14     // init
15     // 监听的lfd挂到epoll树上
16     struct epoll_event ev;
17     // 在ev中init lfd信息
18     ev.events = EPOLLIN ;
19     ev.data.fd = lfd;
20     epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
21     while(1)
22     {
23         // 委托内核检测事件
24         int ret = epoll_wait(epfd, all, 3000, -1);
25         // 根据ret遍历all数组
26         for(int i=0; i<ret; ++i)
27         {
28             int fd = all[i].data.fd;
29             // 有新的连接
30             if( fd == lfd)
31             {
32                 // 接收连接请求 - accept不阻塞
33                 int cfd = accept();
34                 // cfd上树
35                 ev.events = EPOLLIN;
36                 ev.data.fd = cfd;
37                 epoll_ctl(epfd, epoll_ctl_add, cfd, &ev);
38             }
39             // 已经连接的客户端有数据发送过来
40             else
41             {
42                 // 只处理客户端发来的数据
43                 if(!all[i].events & EPOLLIN)
44                 {
45                     continue;
46                 }
47                 // 读数据
48                 int len = recv();
49                 if(len == 0)
50                 {
51                     close(fd);
52                     // 检测的fd从树上删除
53                     epoll_ctl(epfd, epoll_ctl_del, fd, NULL);
54                 }
55                 // 写数据
56                 send();
57             }
58         }
59     }
60 }

示例:

epoll.c

1 #include <stdio.h>
  2 #include <unistd.h>
  3 #include <stdlib.h>
  4 #include <sys/types.h>
  5 #include <string.h>
  6 #include <sys/socket.h>
  7 #include <arpa/inet.h>
  8 #include <ctype.h>
  9 #include <sys/epoll.h>
 10 
 11 
 12 int main(int argc, const char* argv[])
 13 {
 14     if(argc < 2)
 15     {
 16         printf("eg: ./a.out port\n");
 17         exit(1);
 18     }
 19     struct sockaddr_in serv_addr;
 20     socklen_t serv_len = sizeof(serv_addr);
 21     int port = atoi(argv[1]);
 22 
 23     // 创建套接字
 24     int lfd = socket(AF_INET, SOCK_STREAM, 0);
 25     // 初始化服务器 sockaddr_in 
 26     memset(&serv_addr, 0, serv_len);
 27     serv_addr.sin_family = AF_INET;                   // 地址族 
 28     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 监听本机所有的IP
 29     serv_addr.sin_port = htons(port);            // 设置端口 
 30     // 绑定IP和端口
 31     bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
 32 
 33     // 设置同时监听的最大个数
 34     listen(lfd, 36);
 35     printf("Start accept ......\n");
 36 
 37     struct sockaddr_in client_addr;
 38     socklen_t cli_len = sizeof(client_addr);
 39 
 40     // 创建epoll树根节点
 41     int epfd = epoll_create(2000);
 42     // 初始化epoll树
 43     struct epoll_event ev;
 44     ev.events = EPOLLIN;
 45     ev.data.fd = lfd;
 46     epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
 47 
 48     struct epoll_event all[2000];
 49     while(1)
 50     {
 51         // 使用epoll通知内核fd 文件IO检测 sizeof(all)/sizeof(all[0]) --> sizeof(struct epoll_event)
 52         int ret = epoll_wait(epfd, all, sizeof(all)/sizeof(all[0]), -1);
 53 
 54         // 遍历all数组中的前ret个元素
 55         for(int i=0; i<ret; ++i)
 56         {
 57             int fd = all[i].data.fd;
 58             // 判断是否有新连接
 59             if(fd == lfd)
 60             {
 61                 // 接受连接请求
 62                 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
 63                 if(cfd == -1)
 64                 {
 65                     perror("accept error");
 66                     exit(1);
 67                 }
 68                 // 将新得到的cfd挂到树上
 69                 struct epoll_event temp;
 70                 temp.events = EPOLLIN;
 71                 temp.data.fd = cfd;
 72                 epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &temp);
 73                 
 74                 // 打印客户端信息
 75                 char ip[64] = {0};
 76                 printf("New Client IP: %s, Port: %d\n",
 77                     inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
 78                     ntohs(client_addr.sin_port));
 79                 
 80             }
 81             else
 82             {
 83                 // 处理已经连接的客户端发送过来的数据
 84                 if(!all[i].events & EPOLLIN) 
 85                 {
 86                     continue;
 87                 }
 88 
 89                 // 读数据
 90                 char buf[1024] = {0};
 91                 int len = recv(fd, buf, sizeof(buf), 0);
 92                 if(len == -1)
 93                 {
 94                     perror("recv error");
 95                     exit(1);
 96                 }
 97                 else if(len == 0)
 98                 {
 99                     printf("client disconnected ....\n");
100                     // fd从epoll树上删除
101                     ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
102                     if(ret == -1)
103                     {
104                         perror("epoll_ctl - del error");
105                         exit(1);
106                     }
107                     close(fd);
108                 }
109                 else
110                 {
111                     printf(" recv buf: %s\n", buf);
112                     write(fd, buf, len);
113                 }
114             }
115         }
116     }
117 
118     close(lfd);
119     return 0;
120 }

client.c

1 /* client.c */
 2 #include <stdio.h>
 3 #include <string.h>
 4 #include <unistd.h>
 5 #include <netinet/in.h>
 6 #include "wrap.h"
 7 
 8 #define MAXLINE 80
 9 #define SERV_PORT 6666
10 
11 int main(int argc, char *argv[])
12 {
13     struct sockaddr_in servaddr;
14     char buf[MAXLINE];
15     int sockfd, n;
16 
17     sockfd = Socket(AF_INET, SOCK_STREAM, 0);
18 
19     bzero(&servaddr, sizeof(servaddr));
20     servaddr.sin_family = AF_INET;
21     inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
22     servaddr.sin_port = htons(SERV_PORT);
23 
24     Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
25 
26     while (fgets(buf, MAXLINE, stdin) != NULL) {
27         Write(sockfd, buf, strlen(buf));
28         n = Read(sockfd, buf, MAXLINE);
29         if (n == 0)
30             printf("the other side has been closed.\n");
31         else
32             Write(STDOUT_FILENO, buf, n);
33     }
34 
35     Close(sockfd);
36     return 0;
37 }

    注意:epoll_wait 调用次数越多, 系统的开销越大

四、epoll进阶

 1. 事件模型

      EPOLL事件有两种模型:

      Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。

      Level Triggered (LT) 水平触发只要有数据都会触发。

     思考如下步骤:

     1)假定我们已经把一个用来从管道中读取数据的文件描述符(RFD)添加到epoll描述符。

     2)管道的另一端写入了2KB的数据

     3)调用epoll_wait,并且它会返回RFD,说明它已经准备好读取操作

     4)读取1KB的数据

     5)调用epoll_wait……

     在这个过程中,有两种工作模式:

     (1)ET模式

      ET模式即Edge Triggered工作模式。

      如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在

等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩

余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面

会介绍避免可能的缺陷。

  • 基于非阻塞文件句柄
  • 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的

读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

     (2)LT模式

      LT模式即Level Triggered工作模式。

     与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。

     LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任

何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

     ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再

为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

2.  实例一

    基于管道epoll ET触发模式

et_epoll.c

1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <sys/epoll.h>
 4 #include <errno.h>
 5 #include <unistd.h>
 6 
 7 #define MAXLINE 10
 8 
 9 int main(int argc, char *argv[])
10 {
11     int efd, i;
12     int pfd[2];
13     pid_t pid;
14     char buf[MAXLINE], ch = 'a';
15 
16     pipe(pfd);
17     pid = fork();
18     if (pid == 0) {
19         close(pfd[0]);
20         while (1) {
21             for (i = 0; i < MAXLINE/2; i++)
22                 buf[i] = ch;
23             buf[i-1] = '\n';
24             ch++;
25 
26             for (; i < MAXLINE; i++)
27                 buf[i] = ch;
28             buf[i-1] = '\n';
29             ch++;
30 
31             write(pfd[1], buf, sizeof(buf));
32             sleep(2);
33         }
34         close(pfd[1]);
35     } else if (pid > 0) {
36         struct epoll_event event;
37         struct epoll_event resevent[10];
38         int res, len;
39         close(pfd[1]);
40 
41         efd = epoll_create(10);
42         /* event.events = EPOLLIN; */
43         event.events = EPOLLIN | EPOLLET;        /* ET 边沿触发 ,默认是水平触发 */
44         event.data.fd = pfd[0];
45     epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);
46 
47         while (1) {
48             res = epoll_wait(efd, resevent, 10, -1);
49             printf("res %d\n", res);
50             if (resevent[0].data.fd == pfd[0]) {
51                 len = read(pfd[0], buf, MAXLINE/2);
52                 write(STDOUT_FILENO, buf, len);
53             }
54         }
55         close(pfd[0]);
56         close(efd);
57     } else {
58         perror("fork");
59         exit(-1);
60     }
61     return 0;
62 }

3. 实例二

    基于网络C/S模型的epoll ET触发模式

et_epoll.c

1 #include <stdio.h>
  2 #include <unistd.h>
  3 #include <stdlib.h>
  4 #include <sys/types.h>
  5 #include <string.h>
  6 #include <sys/socket.h>
  7 #include <arpa/inet.h>
  8 #include <ctype.h>
  9 #include <sys/epoll.h>
 10 
 11 
 12 int main(int argc, const char* argv[])
 13 {
 14     if(argc < 2)
 15     {
 16         printf("eg: ./a.out port\n");
 17         exit(1);
 18     }
 19     struct sockaddr_in serv_addr;
 20     socklen_t serv_len = sizeof(serv_addr);
 21     int port = atoi(argv[1]);
 22 
 23     // 创建套接字
 24     int lfd = socket(AF_INET, SOCK_STREAM, 0);
 25     // 初始化服务器 sockaddr_in 
 26     memset(&serv_addr, 0, serv_len);
 27     serv_addr.sin_family = AF_INET;                   // 地址族 
 28     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 监听本机所有的IP
 29     serv_addr.sin_port = htons(port);            // 设置端口 
 30     // 绑定IP和端口
 31     bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
 32 
 33     // 设置同时监听的最大个数
 34     listen(lfd, 36);
 35     printf("Start accept ......\n");
 36 
 37     struct sockaddr_in client_addr;
 38     socklen_t cli_len = sizeof(client_addr);
 39 
 40     // 创建epoll树根节点
 41     int epfd = epoll_create(2000);
 42     // 初始化epoll树
 43     struct epoll_event ev;
 44 
 45     // 设置边沿触发
 46     ev.events = EPOLLIN | EPOLLET;
 47     ev.data.fd = lfd;
 48     epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
 49 
 50     struct epoll_event all[2000];
 51     while(1)
 52     {
 53         // 使用epoll通知内核fd 文件IO检测
 54         int ret = epoll_wait(epfd, all, sizeof(all)/sizeof(all[0]), -1);
 55         printf("================== epoll_wait =============\n");
 56 
 57         // 遍历all数组中的前ret个元素
 58         for(int i=0; i<ret; ++i)
 59         {
 60             int fd = all[i].data.fd;
 61             // 判断是否有新连接
 62             if(fd == lfd)
 63             {
 64                 // 接受连接请求
 65                 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
 66                 if(cfd == -1)
 67                 {
 68                     perror("accept error");
 69                     exit(1);
 70                 }
 71                 // 将新得到的cfd挂到树上
 72                 struct epoll_event temp;
 73                 // 设置边沿触发
 74                 temp.events = EPOLLIN | EPOLLET;
 75                 temp.data.fd = cfd;
 76                 epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &temp);
 77                 
 78                 // 打印客户端信息
 79                 char ip[64] = {0};
 80                 printf("New Client IP: %s, Port: %d\n",
 81                     inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
 82                     ntohs(client_addr.sin_port));
 83                 
 84             }
 85             else
 86             {
 87                 // 处理已经连接的客户端发送过来的数据
 88                 if(!all[i].events & EPOLLIN) 
 89                 {
 90                     continue;
 91                 }
 92 
 93                 // 读数据
 94                 char buf[5] = {0};
 95                 int len = recv(fd, buf, sizeof(buf), 0);
 96                 if(len == -1)
 97                 {
 98                     perror("recv error");
 99                     exit(1);
100                 }
101                 else if(len == 0)
102                 {
103                     printf("client disconnected ....\n");
104                     // fd从epoll树上删除
105                     ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
106                     if(ret == -1)
107                     {
108                         perror("epoll_ctl - del error");
109                         exit(1);
110                     }
111                     close(fd);                   
112                 }
113                 else
114                 {
115                     // printf(" recv buf: %s\n", buf);
116                     write(STDOUT_FILENO, buf, len);
117                     write(fd, buf, len);
118                 }
119             }
120         }
121     }
122 
123     close(lfd);
124     return 0;
125 }

tcp_client.c

1 #include <stdio.h>
 2 #include <unistd.h>
 3 #include <stdlib.h>
 4 #include <sys/types.h>
 5 #include <sys/stat.h>
 6 #include <string.h>
 7 #include <arpa/inet.h>
 8 #include <fcntl.h>
 9 
10 // tcp client
11 int main(int argc, const char* argv[])
12 {
13     if(argc < 2)
14     {
15         printf("eg: ./a.out port\n");
16         exit(1);
17     }
18     // 创建套接字
19     int fd = socket(AF_INET, SOCK_STREAM, 0);
20     if(fd == -1)
21     {
22         perror("socket error");
23         exit(1);
24     }
25     int port = atoi(argv[1]);
26     // 连接服务器
27     struct sockaddr_in serv_addr;
28     memset(&serv_addr, 0, sizeof(serv_addr));
29     serv_addr.sin_family = AF_INET;
30     serv_addr.sin_port = htons(port);
31     inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);
32     int ret = connect(fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
33     if(ret == -1)
34     {
35         perror("connect error");
36         exit(1);
37     }
38 
39     // 通信
40     while(1)
41     {
42         // 写数据
43         // 接收键盘输入
44         char buf[512];
45         fgets(buf, sizeof(buf), stdin);
46         // 发送给服务器
47         write(fd, buf, strlen(buf)+1);
48 
49         // 接收服务器端的数据
50         int len = read(fd, buf, sizeof(buf));
51         printf("read buf = %s, len = %d\n", buf, len);
52     }
53     return 0;
54 }

    执行结果:

    client端:

[root@centos epoll]# ./client 6666
000001111122222

read buf = 000001111122222
, len = 5


read buf = 11111, len = 5


read buf = 22222, len = 5

    server端:

[root@centos epoll]# ./et_epoll 6666
Start accept ......
================== epoll_wait =============
New Client IP: 127.0.0.1, Port: 54080
================== epoll_wait =============
00000================== epoll_wait =============
11111================== epoll_wait =============
22222

    执行结果分析:可以看出,当客户端发送数据(000001111122222)到server端(接收数据缓冲区内),但是由于server端一次只接受5个字节(00000),因此在接受完5个字节之后,将接收的5个字节数据发回给客户端,程序又会在epoll_wait处阻塞等待。当有新数据再次发送过来,则会将上一次缓冲区中剩余的数据(11111)读取并发送给客户端,如此最后将(22222)发送给客户端。

【Linux C编程】第十八章 高并发服务器(四)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。