【Linux C编程】第二十二章节Linux线程池实现
【摘要】 线程池的实现过程
一、线程池实现原理
1. 管理者线程
(1)计算线程不够用
- 创建线程
(2) 空闲线程太多
a. 销毁
- 更新要销毁的线程个数
- 通过条件变量完成的
b. 如果空闲太多,任务不够
- 线程阻塞在该条件变量上
c. 发送信号
- pthread_cond_signal
2. 线程池中的线程
(1)从任务队列中取数据
- 任务队列任务
- 执行任务
(2)销毁空闲的线程
- 让线程执行pthread_exit
- 阻塞空闲的线程收到信号:
解除阻塞
只有一个往下执行
在执行任务之前做了销毁操作 -- 自行退出
二、线程池代码实现
1. 初始化一些线程
2. 需要有一个管理者线程
a. 如果使用率超过一定的百分比
创建线程: 按照一定的步长增长
b. 空闲的线程增多
销毁线程
留下的比实际多一些
3. 线程工作的时候
处理数据的时候:
- 互斥锁
- 条件变量
注意:
a. 线程阻塞条件:
任务对列如果为空 cond_empty
pthread_cond_wait(&cond_empty, &mutex);
b. 任务队列中有数据:
激活阻塞在条件变量上的线程:
pthread_cond_signal(&cond_empty);
pthead_cond_broadcast(&cond_empty);
最终代码:
threadpool.c
1 #include <stdlib.h>
2 #include <pthread.h>
3 #include <unistd.h>
4 #include <assert.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <signal.h>
8 #include <errno.h>
9 #include "threadpool.h"
10
11 #define DEFAULT_TIME 10 /*10s检测一次*/
12 #define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
13 #define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
14 #define true 1
15 #define false 0
16
17 typedef struct {
18 void *(*function)(void *); /* 函数指针,回调函数 */
19 void *arg; /* 上面函数的参数 */
20 } threadpool_task_t; /* 各子线程任务结构体 */
21
22 /* 描述线程池相关信息 */
23 struct threadpool_t {
24 pthread_mutex_t lock; /* 用于锁住本结构体 */
25 pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */
26 pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
27 pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */
28
29 pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
30 pthread_t adjust_tid; /* 存管理线程tid */
31 threadpool_task_t *task_queue; /* 任务队列 */
32
33 int min_thr_num; /* 线程池最小线程数 */
34 int max_thr_num; /* 线程池最大线程数 */
35 int live_thr_num; /* 当前存活线程个数 */
36 int busy_thr_num; /* 忙状态线程个数 */
37 int wait_exit_thr_num; /* 要销毁的线程个数 */
38
39 int queue_front; /* task_queue队头下标 */
40 int queue_rear; /* task_queue队尾下标 */
41 int queue_size; /* task_queue队中实际任务数 */
42 int queue_max_size; /* task_queue队列可容纳任务数上限 */
43
44 int shutdown; /* 标志位,线程池使用状态,true或false */
45 };
46
47 /**
48 * @function void *threadpool_thread(void *threadpool)
49 * @desc the worker thread
50 * @param threadpool the pool which own the thread
51 */
52 void *threadpool_thread(void *threadpool);
53
54 /**
55 * @function void *adjust_thread(void *threadpool);
56 * @desc manager thread
57 * @param threadpool the threadpool
58 */
59 void *adjust_thread(void *threadpool);
60
61 /**
62 * check a thread is alive
63 */
64 int is_thread_alive(pthread_t tid);
65 int threadpool_free(threadpool_t *pool);
66
67 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
68 {
69 int i;
70 threadpool_t *pool = NULL;
71 do {
72 if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
73 printf("malloc threadpool fail");
74 break;/*跳出do while*/
75 }
76
77 pool->min_thr_num = min_thr_num;
78 pool->max_thr_num = max_thr_num;
79 pool->busy_thr_num = 0;
80 pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */
81 pool->queue_size = 0; /* 有0个产品 */
82 pool->queue_max_size = queue_max_size;
83 pool->queue_front = 0;
84 pool->queue_rear = 0;
85 pool->shutdown = false; /* 不关闭线程池 */
86
87 /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
88 pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
89 if (pool->threads == NULL) {
90 printf("malloc threads fail");
91 break;
92 }
93 memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
94
95 /* 队列开辟空间 */
96 pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
97 if (pool->task_queue == NULL) {
98 printf("malloc task_queue fail");
99 break;
100 }
101
102 /* 初始化互斥琐、条件变量 */
103 if (pthread_mutex_init(&(pool->lock), NULL) != 0
104 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
105 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
106 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
107 {
108 printf("init the lock or cond fail");
109 break;
110 }
111
112 /* 启动 min_thr_num 个 work thread */
113 for (i = 0; i < min_thr_num; i++) {
114 pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
115 printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
116 }
117 pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
118
119 return pool;
120
121 } while (0);
122
123 threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */
124
125 return NULL;
126 }
127
128 /* 向线程池中 添加一个任务 */
129 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
130 {
131 pthread_mutex_lock(&(pool->lock));
132
133 /* ==为真,队列已经满, 调wait阻塞 */
134 while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
135 pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
136 }
137 if (pool->shutdown) {
138 pthread_mutex_unlock(&(pool->lock));
139 }
140
141 /* 清空 工作线程 调用的回调函数 的参数arg */
142 if (pool->task_queue[pool->queue_rear].arg != NULL) {
143 free(pool->task_queue[pool->queue_rear].arg);
144 pool->task_queue[pool->queue_rear].arg = NULL;
145 }
146 /*添加任务到任务队列里*/
147 pool->task_queue[pool->queue_rear].function = function;
148 pool->task_queue[pool->queue_rear].arg = arg;
149 pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */
150 pool->queue_size++;
151
152 /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
153 pthread_cond_signal(&(pool->queue_not_empty));
154 pthread_mutex_unlock(&(pool->lock));
155
156 return 0;
157 }
158
159 /* 线程池中各个工作线程 */
160 void *threadpool_thread(void *threadpool)
161 {
162 threadpool_t *pool = (threadpool_t *)threadpool;
163 threadpool_task_t task;
164
165 while (true) {
166 /* Lock must be taken to wait on conditional variable */
167 /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
168 pthread_mutex_lock(&(pool->lock));
169
170 /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
171 while ((pool->queue_size == 0) && (!pool->shutdown)) {
172 printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
173 pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
174
175 /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
176 if (pool->wait_exit_thr_num > 0) {
177 pool->wait_exit_thr_num--;
178
179 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
180 if (pool->live_thr_num > pool->min_thr_num) {
181 printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
182 pool->live_thr_num--;
183 pthread_mutex_unlock(&(pool->lock));
184 pthread_exit(NULL);
185 }
186 }
187 }
188
189 /*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
190 if (pool->shutdown) {
191 pthread_mutex_unlock(&(pool->lock));
192 printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
193 pthread_exit(NULL); /* 线程自行结束 */
194 }
195
196 /*从任务队列里获取任务, 是一个出队操作*/
197 task.function = pool->task_queue[pool->queue_front].function;
198 task.arg = pool->task_queue[pool->queue_front].arg;
199
200 pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */
201 pool->queue_size--;
202
203 /*通知可以有新的任务添加进来*/
204 pthread_cond_broadcast(&(pool->queue_not_full));
205
206 /*任务取出后,立即将 线程池琐 释放*/
207 pthread_mutex_unlock(&(pool->lock));
208
209 /*执行任务*/
210 printf("thread 0x%x start working\n", (unsigned int)pthread_self());
211 pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
212 pool->busy_thr_num++; /*忙状态线程数+1*/
213 pthread_mutex_unlock(&(pool->thread_counter));
214 (*(task.function))(task.arg); /*执行回调函数任务*/
215 //task.function(task.arg); /*执行回调函数任务*/
216
217 /*任务结束处理*/
218 printf("thread 0x%x end working\n", (unsigned int)pthread_self());
219 pthread_mutex_lock(&(pool->thread_counter));
220 pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
221 pthread_mutex_unlock(&(pool->thread_counter));
222 }
223
224 pthread_exit(NULL);
225 }
226
227 /* 管理线程 */
228 void *adjust_thread(void *threadpool)
229 {
230 int i;
231 threadpool_t *pool = (threadpool_t *)threadpool;
232 while (!pool->shutdown) {
233
234 sleep(DEFAULT_TIME); /*定时 对线程池管理*/
235
236 pthread_mutex_lock(&(pool->lock));
237 int queue_size = pool->queue_size; /* 关注 任务数 */
238 int live_thr_num = pool->live_thr_num; /* 存活 线程数 */
239 pthread_mutex_unlock(&(pool->lock));
240
241 pthread_mutex_lock(&(pool->thread_counter));
242 int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */
243 pthread_mutex_unlock(&(pool->thread_counter));
244
245 /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
246 if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
247 pthread_mutex_lock(&(pool->lock));
248 int add = 0;
249
250 /*一次增加 DEFAULT_THREAD 个线程*/
251 for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
252 && pool->live_thr_num < pool->max_thr_num; i++) {
253 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
254 pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
255 add++;
256 pool->live_thr_num++;
257 }
258 }
259
260 pthread_mutex_unlock(&(pool->lock));
261 }
262
263 /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
264 if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
265
266 /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
267 pthread_mutex_lock(&(pool->lock));
268 pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */
269 pthread_mutex_unlock(&(pool->lock));
270
271 for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
272 /* 通知处在空闲状态的线程, 他们会自行终止*/
273 pthread_cond_signal(&(pool->queue_not_empty));
274 }
275 }
276 }
277
278 return NULL;
279 }
280
281 int threadpool_destroy(threadpool_t *pool)
282 {
283 int i;
284 if (pool == NULL) {
285 return -1;
286 }
287 pool->shutdown = true;
288
289 /*先销毁管理线程*/
290 pthread_join(pool->adjust_tid, NULL);
291
292 for (i = 0; i < pool->live_thr_num; i++) {
293 /*通知所有的空闲线程*/
294 pthread_cond_broadcast(&(pool->queue_not_empty));
295 }
296 for (i = 0; i < pool->live_thr_num; i++) {
297 pthread_join(pool->threads[i], NULL);
298 }
299 threadpool_free(pool);
300
301 return 0;
302 }
303
304 int threadpool_free(threadpool_t *pool)
305 {
306 if (pool == NULL) {
307 return -1;
308 }
309
310 if (pool->task_queue) {
311 free(pool->task_queue);
312 }
313 if (pool->threads) {
314 free(pool->threads);
315 pthread_mutex_lock(&(pool->lock));
316 pthread_mutex_destroy(&(pool->lock));
317 pthread_mutex_lock(&(pool->thread_counter));
318 pthread_mutex_destroy(&(pool->thread_counter));
319 pthread_cond_destroy(&(pool->queue_not_empty));
320 pthread_cond_destroy(&(pool->queue_not_full));
321 }
322 free(pool);
323 pool = NULL;
324
325 return 0;
326 }
327
328 int threadpool_all_threadnum(threadpool_t *pool)
329 {
330 int all_threadnum = -1;
331 pthread_mutex_lock(&(pool->lock));
332 all_threadnum = pool->live_thr_num;
333 pthread_mutex_unlock(&(pool->lock));
334 return all_threadnum;
335 }
336
337 int threadpool_busy_threadnum(threadpool_t *pool)
338 {
339 int busy_threadnum = -1;
340 pthread_mutex_lock(&(pool->thread_counter));
341 busy_threadnum = pool->busy_thr_num;
342 pthread_mutex_unlock(&(pool->thread_counter));
343 return busy_threadnum;
344 }
345
346 int is_thread_alive(pthread_t tid)
347 {
348 int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活
349 if (kill_rc == ESRCH) {
350 return false;
351 }
352
353 return true;
354 }
355
356 /*测试*/
357
358 #if 1
359 /* 线程池中的线程,模拟处理业务 */
360 void *process(void *arg)
361 {
362 printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);
363 sleep(1);
364 printf("task %d is end\n",*(int *)arg);
365
366 return NULL;
367 }
368 int main(void)
369 {
370 /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
371
372 threadpool_t *thp = threadpool_create(3,100,100);/*创建线程池,池里最小3个线程,最大100,队列最大100*/
373 printf("pool inited");
374
375 //int *num = (int *)malloc(sizeof(int)*20);
376 int num[20], i;
377 for (i = 0; i < 20; i++) {
378 num[i]=i;
379 printf("add task %d\n",i);
380 threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */
381 }
382 sleep(10); /* 等子线程完成任务 */
383 threadpool_destroy(thp);
384
385 return 0;
386 }
387
388 #endif
threadpool.h
1 #ifndef __THREADPOOL_H_
2 #define __THREADPOOL_H_
3
4 typedef struct threadpool_t threadpool_t;
5
6 /**
7 * @function threadpool_create
8 * @descCreates a threadpool_t object.
9 * @param thr_num thread num
10 * @param max_thr_num max thread size
11 * @param queue_max_size size of the queue.
12 * @return a newly created thread pool or NULL
13 */
14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
15
16 /**
17 * @function threadpool_add
18 * @desc add a new task in the queue of a thread pool
19 * @param pool Thread pool to which add the task.
20 * @param function Pointer to the function that will perform the task.
21 * @param argument Argument to be passed to the function.
22 * @return 0 if all goes well,else -1
23 */
24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
25
26 /**
27 * @function threadpool_destroy
28 * @desc Stops and destroys a thread pool.
29 * @param pool Thread pool to destroy.
30 * @return 0 if destory success else -1
31 */
32 int threadpool_destroy(threadpool_t *pool);
33
34 /**
35 * @desc get the thread num
36 * @pool pool threadpool
37 * @return # of the thread
38 */
39 int threadpool_all_threadnum(threadpool_t *pool);
40
41 /**
42 * desc get the busy thread num
43 * @param pool threadpool
44 * return # of the busy thread
45 */
46 int threadpool_busy_threadnum(threadpool_t *pool);
47
48 #endif
makefile
1 src = $(wildcard *.c)
2 targets = $(patsubst %.c, %, $(src))
3
4 CC = gcc
5 CFLAGS = -lpthread -Wall -g
6
7 all:$(targets)
8
9 $(targets):%:%.c
10 $(CC) $< -o $@ $(CFLAGS)
11
12 .PHONY:clean all
13 clean:
14 -rm -rf $(targets)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)