【Linux C编程】第十八章 高并发服务器(二)
select实现server伪代码
1 int main()
2 {
3 int lfd = socket();
4 bind();
5 listen();
6
7 // 创建一文件描述符表
8 fd_st reads, temp;
9 // 初始化
10 fd_zero(&reads);
11 // 监听的lfd加入到读集合
12 fd_set(lfd, &reads);
13 int maxfd = lfd;
14
15 while(1)
16 {
17 // 委托检测
18 temp = reads;
19 int ret = select(maxfd+1, &temp, NULL, NULL, NULL);
20
21 // 是不是监听的
22 if(fd_isset(lfd, &temp))
23 {
24 // 接受新连接
25 int cfd = accept();
26 // cfd加入读集合
27 fd_set(cfd, &reads);
28 // 更新maxfd
29 maxfd=maxfd<cfd ? cfd:maxfd;
30 }
31 // 客户端发送数据
32 for(int i=lfd+1; i<=maxfd; ++i)
33 {
34 if(fd_isset(i, &temp)
35 {
36 int len = read();
37 if(len == 0)
38 {
39 // cfd 从读集合中del
40 fd_clr(i, &reads);
41 }
42 write();
43 }
44 }
45 }
46 }
(2)使用select函的优缺点:
- 优点:
跨平台
- 缺点:
a. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大;
b. 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大;
c. select支持的文件描述符数量太小了,默认是1024。
为什么是1024?
首先,看下内核中对fd_set的定义:
typedef struct {
unsigned long fds_bits[__FDSET_LONGS];
} __kernel_fd_set;
typedef __kernel_fd_set fd_set;
其中有关的常量定义为:
#undef __NFDBITS
#define __NFDBITS (8 * sizeof(unsigned long))
#undef __FD_SETSIZE
#define __FD_SETSIZE 1024
#undef __FDSET_LONGS
#define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS)
即__NFDBITS为8*4=32,__FD_SETSIZE为1024,那么,__FDSET_LONGS为1024/32=32,因此,fd_set实际上是32个无符号长整形,也就是1024位
(2)select函数及示例
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态
readfds: 监控有读数据到达文件描述符集合,传入传出参数
writefds: 监控写数据到达文件描述符集合,传入传出参数
exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数
timeout: 定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0
int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
select示例:
select.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9
10
11 int main(int argc, const char* argv[])
12 {
13 if(argc < 2)
14 {
15 printf("eg: ./a.out port\n");
16 exit(1);
17 }
18 struct sockaddr_in serv_addr;
19 socklen_t serv_len = sizeof(serv_addr);
20 int port = atoi(argv[1]);
21
22 // 创建套接字
23 int lfd = socket(AF_INET, SOCK_STREAM, 0);
24 // 初始化服务器 sockaddr_in
25 memset(&serv_addr, 0, serv_len);
26 serv_addr.sin_family = AF_INET; // 地址族
27 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
28 serv_addr.sin_port = htons(port); // 设置端口
29 // 绑定IP和端口
30 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
31
32 // 设置同时监听的最大个数
33 listen(lfd, 36);
34 printf("Start accept ......\n");
35
36 struct sockaddr_in client_addr;
37 socklen_t cli_len = sizeof(client_addr);
38
39 // 最大的文件描述符
40 int maxfd = lfd;
41 // 文件描述符读集合
42 fd_set reads, temp;
43 // init
44 FD_ZERO(&reads);
45 FD_SET(lfd, &reads);
46
47 while(1)
48 {
49 // 委托内核做IO检测
50 temp = reads;
51 int ret = select(maxfd+1, &temp, NULL, NULL, NULL);
52 if(ret == -1)
53 {
54 perror("select error");
55 exit(1);
56 }
57 // 客户端发起了新的连接
58 if(FD_ISSET(lfd, &temp))
59 {
60 // 接受连接请求 - accept不阻塞
61 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
62 if(cfd == -1)
63 {
64 perror("accept error");
65 exit(1);
66 }
67 char ip[64];
68 printf("new client IP: %s, Port: %d\n",
69 inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
70 ntohs(client_addr.sin_port));
71 // 将cfd加入到待检测的读集合中 - 下一次就可以检测到了
72 FD_SET(cfd, &reads);
73 // 更新最大的文件描述符
74 maxfd = maxfd < cfd ? cfd : maxfd;
75 }
76 // 已经连接的客户端有数据到达
77 for(int i=lfd+1; i<=maxfd; ++i)
78 {
79 if(FD_ISSET(i, &temp))
80 {
81 char buf[1024] = {0};
82 int len = recv(i, buf, sizeof(buf), 0);
83 if(len == -1)
84 {
85 perror("recv error");
86 exit(1);
87 }
88 else if(len == 0)
89 {
90 printf("客户端已经断开了连接\n");
91 close(i);
92 // 从读集合中删除
93 FD_CLR(i, &reads);
94 }
95 else
96 {
97 printf("recv buf: %s\n", buf);
98 send(i, buf, strlen(buf)+1, 0);
99 }
100 }
101 }
102 }
103
104 close(lfd);
105 return 0;
106 }
select示例2:
select.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9 #include <sys/select.h>
10
11 #define SERV_PORT 8989
12
13 int main(int argc, const char* argv[])
14 {
15 int lfd, cfd;
16 struct sockaddr_in serv_addr, clien_addr;
17 int serv_len, clien_len;
18
19 // 创建套接字
20 lfd = socket(AF_INET, SOCK_STREAM, 0);
21 // 初始化服务器 sockaddr_in
22 memset(&serv_addr, 0, sizeof(serv_addr));
23 serv_addr.sin_family = AF_INET; // 地址族
24 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
25 serv_addr.sin_port = htons(SERV_PORT); // 设置端口
26 serv_len = sizeof(serv_addr);
27 // 绑定IP和端口
28 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
29
30 // 设置同时监听的最大个数
31 listen(lfd, 36);
32 printf("Start accept ......\n");
33
34 int ret;
35 int maxfd = lfd;
36 // reads 实时更新,temps 内核检测
37 fd_set reads, temps;
38
39 FD_ZERO(&reads);
40 FD_SET(lfd, &reads);
41
42 while(1)
43 {
44 temps = reads;
45 ret = select(maxfd+1, &temps, NULL, NULL, NULL);
46 if(ret == -1)
47 {
48 perror("select error");
49 exit(1);
50 }
51
52
53 // 判断是否有新连接
54 if(FD_ISSET(lfd, &temps))
55 {
56 // 接受连接请求
57 clien_len = sizeof(clien_len);
58 int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len);
59
60 // 文件描述符放入检测集合
61 FD_SET(cfd, &reads);
62 // 更新最大文件描述符
63 maxfd = maxfd < cfd ? cfd : maxfd;
64 }
65
66 // 遍历检测的文件描述符是否有读操作
67 for(int i=lfd+1; i<=maxfd; ++i)
68 {
69 if(FD_ISSET(i, &temps))
70 {
71 // 读数据
72 char buf[1024] = {0};
73 int len = read(i, buf, sizeof(buf));
74 if(len == -1)
75 {
76 perror("read error");
77 exit(1);
78 }
79 else if(len == 0)
80 {
81 // 对方关闭了连接
82 FD_CLR(i, &reads);
83 close(i);
84 if(maxfd == i)
85 {
86 maxfd--;
87 }
88 }
89 else
90 {
91 printf("read buf = %s\n", buf);
92 for(int j=0; j<len; ++j)
93 {
94 buf[j] = toupper(buf[j]);
95 }
96 printf("--buf toupper: %s\n", buf);
97 write(i, buf, strlen(buf)+1);
98 }
99 }
100 }
101 }
102
103 close(lfd);
104 return 0;
105 }
select_plus.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9 #include <sys/select.h>
10
11 #define SERV_PORT 8989
12
13 int main(int argc, const char* argv[])
14 {
15 int lfd, cfd;
16 struct sockaddr_in serv_addr, clien_addr;
17 int serv_len, clien_len;
18
19 // 创建套接字
20 lfd = socket(AF_INET, SOCK_STREAM, 0);
21 // 初始化服务器 sockaddr_in
22 memset(&serv_addr, 0, sizeof(serv_addr));
23 serv_addr.sin_family = AF_INET; // 地址族
24 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
25 serv_addr.sin_port = htons(SERV_PORT); // 设置端口
26 serv_len = sizeof(serv_addr);
27 // 绑定IP和端口
28 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
29
30 // 设置同时监听的最大个数
31 listen(lfd, 36);
32 printf("Start accept ......\n");
33
34 int ret;
35 int maxfd = lfd;
36 // reads 实时更新,temps 内核检测
37 fd_set reads, temps;
38
39 /*===============================================================*/
40 // 记录要检测的文件描述符的数组
41 int allfd[FD_SETSIZE]; // 1024
42 // 记录数组中最后一个元素的下标
43 int last_index = 0;
44 // 初始化数组
45 for(int i=0; i<FD_SETSIZE; ++i)
46 {
47 allfd[i] = -1; // 无效文件描述符值
48 }
49 allfd[0] = lfd; // 监听的文件描述符添加到数组中
50 /*===============================================================*/
51
52 // 初始化监听的读集合
53 FD_ZERO(&reads);
54 FD_SET(lfd, &reads);
55
56 while(1)
57 {
58 // 每次都需要更新,否则select不会重新检测
59 temps = reads;
60 ret = select(maxfd+1, &temps, NULL, NULL, NULL);
61 if(ret == -1)
62 {
63 perror("select error");
64 exit(1);
65 }
66
67 int i = 0;
68 char bufip[64];
69 // 判断是否有新连接
70 if(FD_ISSET(lfd, &temps))
71 {
72 // 接受连接请求
73 clien_len = sizeof(clien_len);
74 int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len);
75 printf("client ip: %s, port: %d\n",
76 inet_ntop(AF_INET, &clien_addr.sin_addr.s_addr, bufip, sizeof(bufip)),
77 ntohs(clien_addr.sin_port));
78
79 // 文件描述符放入检测集合
80 FD_SET(cfd, &reads);
81 // 更新最大文件描述符
82 maxfd = maxfd < cfd ? cfd : maxfd;
83 // cfd添加到检测数组中
84 for(i=0; i<FD_SETSIZE; ++i)
85 {
86 if(allfd[i] == -1)
87 {
88 allfd[i] = cfd;
89 break;
90 }
91 }
92 // 更新数组最后一个有效值下标
93 last_index = last_index < i ? i : last_index;
94 }
95
96 // 遍历检测的文件描述符是否有读操作
97 for(i=lfd+1; i<=maxfd; ++i)
98 {
99 if(FD_ISSET(i, &temps))
100 {
101 // 读数据
102 char buf[1024] = {0};
103 int len = read(i, buf, sizeof(buf));
104 if(len == -1)
105 {
106 perror("read error");
107 exit(1);
108 }
109 else if(len == 0)
110 {
111 // 对方关闭了连接
112 FD_CLR(i, &reads);
113 close(i);
114 if(maxfd == i)
115 {
116 maxfd--;
117 }
118 allfd[i] = -1;
119 printf("对方已经关闭了连接。。。。。。\n");
120 }
121 else
122 {
123 printf("read buf = %s\n", buf);
124 for(int j=0; j<len; ++j)
125 {
126 buf[j] = toupper(buf[j]);
127 }
128 printf("--buf toupper: %s\n", buf);
129 write(i, buf, strlen(buf)+1);
130 }
131 }
132 }
133 }
134
135 close(lfd);
136 return 0;
137 }
补充 pselect:
pselect原型如下。此模型应用较少,可参考select模型自行编写C/S:
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
struct timespec {
long tv_sec; /* seconds */
long tv_nsec; /* nanoseconds */
};
用sigmask替代当前进程的阻塞信号集,调用返回后还原原有阻塞信号集
3. poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 监控的事件 */
short revents; /* 监控事件中满足条件返回的事件 */
};
POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND
POLLRDNORM 数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级可读数据
POLLOUT 普通或带外数据可写
POLLWRNORM 数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
fds 数组地址
nfds 监控数组中有多少文件描述符需要被监控,数组的最大长度, 数组中最后一个使用的元素下标+1,内核会轮询检测fd数组的每个文件描述符
timeout 毫秒级等待
-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏
0:立即返回,不阻塞进程
>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值 返回值: IO发送变化的文件描述符的个数
如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0。
示例(使用poll实现的server):
poll.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <string.h>
6 #include <sys/socket.h>
7 #include <arpa/inet.h>
8 #include <ctype.h>
9 #include <poll.h>
10
11 #define SERV_PORT 8989
12
13 int main(int argc, const char* argv[])
14 {
15 int lfd, cfd;
16 struct sockaddr_in serv_addr, clien_addr;
17 int serv_len, clien_len;
18
19 // 创建套接字
20 lfd = socket(AF_INET, SOCK_STREAM, 0);
21 // 初始化服务器 sockaddr_in
22 memset(&serv_addr, 0, sizeof(serv_addr));
23 serv_addr.sin_family = AF_INET; // 地址族
24 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP
25 serv_addr.sin_port = htons(SERV_PORT); // 设置端口
26 serv_len = sizeof(serv_addr);
27 // 绑定IP和端口
28 bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
29
30 // 设置同时监听的最大个数
31 listen(lfd, 36);
32 printf("Start accept ......\n");
33
34 // poll结构体
35 struct pollfd allfd[1024];
36 int max_index = 0;
37 // init
38 for(int i=0; i<1024; ++i)
39 {
40 allfd[i].fd = -1;
41 }
42 allfd[0].fd = lfd;
43 allfd[0].events = POLLIN;
44
45 while(1)
46 {
47 int i = 0;
48 int ret = poll(allfd, max_index+1, -1);
49 if(ret == -1)
50 {
51 perror("poll error");
52 exit(1);
53 }
54
55 // 判断是否有连接请求
56 if(allfd[0].revents & POLLIN)
57 {
58 clien_len = sizeof(clien_addr);
59 // 接受连接请求
60 int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len);
61 printf("============\n");
62
63 // cfd添加到poll数组
64 for(i=0; i<1024; ++i)
65 {
66 if(allfd[i].fd == -1)
67 {
68 allfd[i].fd = cfd;
69 break;
70 }
71 }
72 // 更新最后一个元素的下标
73 max_index = max_index < i ? i : max_index;
74 }
75
76 // 遍历数组
77 for(i=1; i<=max_index; ++i)
78 {
79 int fd = allfd[i].fd;
80 if(fd == -1)
81 {
82 continue;
83 }
84 if(allfd[i].revents & POLLIN)
85 {
86 // 接受数据
87 char buf[1024] = {0};
88 int len = recv(fd, buf, sizeof(buf), 0);
89 if(len == -1)
90 {
91 perror("recv error");
92 exit(1);
93 }
94 else if(len == 0)
95 {
96 allfd[i].fd = -1;
97 close(fd);
98 printf("客户端已经主动断开连接。。。\n");
99 }
100 else
101 {
102 printf("recv buf = %s\n", buf);
103 for(int k=0; k<len; ++k)
104 {
105 buf[k] = toupper(buf[k]);
106 }
107 printf("buf toupper: %s\n", buf);
108 send(fd, buf, strlen(buf)+1, 0);
109 }
110
111 }
112
113 }
114 }
115
116 close(lfd);
117 return 0;
118 }
poll与select的比较:
- 两者其实没有大的变化,主要是poll没有select对于1024的限制,由于内部实现是通过链表来实现的,因此理论上没有限制。但是两者最大的缺点还是内核会轮询检测fd数组的每个文件描述符。
补充 ppoll:
GNU定义了ppoll(非POSIX标准),可以支持设置信号屏蔽字,可参考poll模型自行实现C/S。
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *timeout_ts, const sigset_t *sigmask);
4. epoll
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者
每次等待事件之前都必须重新准备要被侦听的文件描述符集合(用户态和内核态共享同一片文件描述符表内存),另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那
些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提
高应用程序效率。
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max
如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。
* soft nofile 65536
* hard nofile 100000
基础API
1)创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
#include <sys/epoll.h>
int epoll_create(int size) size:监听数目, epoll上能关注的最大描述符数
2)控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd: 为epoll_creat的句柄
op: 表示动作,用3个宏来表示:
EPOLL_CTL_ADD (注册新的fd到epfd),
EPOLL_CTL_MOD (修改已经注册的fd的监听事件),
EPOLL_CTL_DEL (从epfd删除一个fd);
event: 告诉内核需要监听的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT: 表示对应的文件描述符可以写
EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR: 表示对应的文件描述符发生错误
EPOLLHUP: 表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3)等待所监控文件描述符上有事件的产生,类似于select()调用。
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
events: 用来存内核得到事件的集合,用于回传待处理事件的数组
maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size
timeout: 是超时时间
-1: 阻塞
0: 立即返回,非阻塞
>0: 指定毫秒
返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
- 点赞
- 收藏
- 关注作者
评论(0)