掌握并行处理:理解并构建自己的线程池

举报
Lion Long 发表于 2023/09/23 20:00:39 2023/09/23
【摘要】 摘要:本文深入探讨了线程池的原理和实现,并提供了一个详细的、分步指南,帮助读者掌握并行处理的核心概念。文章开始介绍了并行处理的重要性,以及为什么线程池是一种有效管理并发任务的技术。接着,文章详细解释了线程池的基本原理,包括线程池的组成结构、线程的生命周期和任务队列的管理方法。随后,文章展示了如何使用C++编程语言实现一个简单的线程池,并介绍了线程安全性和任务调度的关键考虑因素。

一、线程池存在原因

(1)线程使用场景:某类任务特别耗时,会严重影响该线程处理其他任务,因此需要在其他线程异步执行该任务。


(2)线程开销:随着这类任务越来越多,需要异步执行任务而开启的线程也越来越多,但是每个CPU的核心数和线程数是固定,过多的线程并不能提高效率。因此,线程资源的开销与CPU核心之间要平衡选择。

(3)原因:

  • 线程资源的开销与CPU核心之间要平衡选择,自然就产生线程池,即需要固定线程的数量。
  • 从前面的执行流程可以看出,每次耗时任务到来都需要不断的创建线程,执行完成后销毁线程。不断的创建和销毁线程会浪费系统资源。
  • 因此需要有线程池,提前分配固定线程数量,不会将它们关闭,当任务到来时复用这些线程。



(3)线程池的作用:

  1. 复用线程资源。
  2. 充分利用系统资源。
  3. 异步执行耗时任务。
  4. 减少了多个任务(不是一个任务)的执行时间。

二、线程池原理

线程池是一个生产–消费模型。发布任务的线程是生产者,其他线程是消费者。

线程池的构成:

(1)生产者线程:发布任务。
(2)队列:亦称任务队列,存放具体的任务。因为任务是异步执行的,任务的内容就包括了任务的上下文以及任务的执行函数。
(3)线程池:即消费者,是固定数量的线程集合;主要完成取出任务、执行任务、任务调度。

2.1、线程调度

由于任务的密疏程度是未知的,即任务是间歇性的,有时候任务很多,有时候任务很少。当任务很少时,需要将不执行任务的线程休眠,不能让其浪费系统资源。这就需要线程调度。

线程的调度主要通过mutex和condition实现。 即互斥锁和条件变量。
线程有两种状态:从无任务到有任务(从无到有)以及从有任务到无任务(从有到无)。

利用条件变量,从无到有时,唤醒线程;从有到无时,休眠线程。那么如何确定条件呢?就是依据任务队列的状态,如果任务队列中有任务,将线程唤醒;如果任务队列为空,将线程休眠。

仔细点说,就是:

  • 当生产者线程发布任务时,任务队列就有任务,进入 从无到有状态;通知某一个线程唤醒,取出任务、执行任务。
  • 线程判断任务队列是否有任务,如果任务队列为空,则进入 从有到无 状态,condition通知线程休眠。

2.2、平衡选择

线程资源的开销与CPU核心之间做平衡选择;平衡选择依据耗时任务而定。耗时任务分为IO密集型和CPU密集型。

  • IO密集型:IO的操作是同步的,系统调用会阻塞的将内核资源拷贝到用户态或者用户态资源阻塞的将资源拷贝到内核中;线程会阻塞等待系统调用完成。
  • CPU密集型:长时间占用CPU,使线程无法处理其他任务。

根据这两个类型,可以确定线程池的线程数量。一般,CPU密集型的线程池数量等于CPU核心数;IO密集型的线程池线程数量等于2倍核心数+2。
有这样一个公式: (IO等待时间+CPU运算时间)核心数/cpu运算时间。根据公式对线程池数量做优化调整,使其符合特定业务逻辑。

三、实现一个线程池

3.1、接口设计

(1)创建线程池的接口。确定线程池的线程数量以及任务队列的长度。
(2)销毁线程池的接口。线程判断线程池销毁标志,如果标记了线程池销毁,线程退出;并且通知所有线程。
(3)生产者线程抛出任务的接口。目的是构造一个任务,并把任务放到任务队列中,通知线程唤醒。

3.2、代码示例

thread_pool.h

#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

typedef struct thread_pool_t thread_pool_t;
typedef void(*handler_pt)(void*);

enum THREAD_POOL_ERROR_CODE
{
	THREAD_POOL_NULL = -1,
	THREAD_MUTEX_FAIL = -2,
	THREAD_POOL_CLOSED = -3,
	THREAD_COND_FAIL = -4,
	THREAD_POOL_TASK_QUE_FULL = -5,
	THREAD_COND_SIGNAL_FAIL=-6,
	THREAD_POOL_SUCCESS = 0
};

thread_pool_t *thread_pool_create(int thread_count,int queue_size);

int thread_pool_destory(thread_pool_t *pool);

int thread_pool_post(thread_pool_t *pool,handler_pt func,void* arg);

int wait_pool_done(thread_pool_t *pool);

#endif

thread_pool.c

#include <pthread.h>
#include <stdint.h>
#include <stddef.h>
#include <stdlib.h>
#include "thread_pool.h"


typedef struct task_t{
	handler_pt func;
	void *arg;
} task_t;


typedef struct task_queue_t{
	uint32_t head;// 队列头索引
	uint32_t tail;// 队列尾索引
	uint32_t count;//任务数量
	task_t *queue;//队列数组
} task_queue_t;

struct thread_pool_t{
	pthread_mutex_t mutex;
	pthread_cond_t condition;
	pthread_t *threads;
	task_queue_t task_queue;
	
	int closed;//销毁线程池标记
	int started;//当前运行的线程数
	
	int thread_count;
	int queue_size;
	
	
};

static void thread_pool_free(thread_pool_t *pool)
{
	if (pool == NULL || pool->started > 0)
		return;

	if (pool->threads)
	{
		free(pool->threads);
		pool->threads = NULL;

		pthread_mutex_lock(&(pool->mutex));
		pthread_mutex_destroy(&(pool->mutex));
		pthread_cond_destroy(&(pool->condition));
	}

	if (pool->task_queue.queue)
	{
		free(pool->task_queue.queue);
		pool->task_queue.queue = NULL;
	}
	free(pool);
}

static void *thread_worker(void *thread_pool)
{
	thread_pool_t *pool = thread_pool;
	task_queue_t *que;
	task_t task;
	for (;;)
	{
		pthread_mutex_lock(&(pool->mutex));//加锁
		que = &(pool->task_queue);
		// 判断虚假唤醒
		while (que->count == 0 && pool->closed == 0)
		{
			// pthread_mutex_unlock(&(pool->mutex))
			// 阻塞在 condition
			// 唤醒信号===================================
			// 解除阻塞
			// pthread_mutex_lock(&(pool->mutex));
			pthread_cond_wait(&(pool->condition),&(pool->mutex));
		}

		if (pool->closed == 1)
			break;

		task = que->queue[que->head];//取出任务
		que->head = (que->head + 1) % pool->queue_size;
		que->count--;
		pthread_mutex_unlock(&(pool->mutex));
		(*(task.func))(task.arg);
	}
	pool->started--;
	pthread_mutex_unlock(&(pool->mutex));
	pthread_exit(NULL);
	return NULL;
}

thread_pool_t *thread_pool_create(int thread_count,int queue_size)
{
	if(thread_count<=0 || queue_size<=0)
		return NULL;
	thread_pool_t *pool;
	pool=(thread_pool_t *)malloc(sizeof(*pool));

	if(pool==NULL)
		return NULL;
	
	pool->thread_count = 0;//从0开始计数
	pool->queue_size = queue_size;
	pool->task_queue.head = 0;
	pool->task_queue.tail = 0;
	pool->task_queue.count = 0;
	pool->started = pool->closed = 0;

	pool->threads = NULL;
	pool->task_queue.queue = NULL;

	pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
	if (pool->task_queue.queue == NULL)
	{
		//free pool
		thread_pool_free(pool);
		return NULL;
	}
		
	pool->threads = (pthread_t*)malloc(sizeof(pthread_t)* thread_count);
	if (pool->threads == NULL)
	{
		//free pool
		thread_pool_free(pool);
		return NULL;
	}

	int i = 0;
	for (i = 0; i < thread_count; i++)
	{
		if (pthread_create(&(pool->threads[i]), NULL,thread_worker, (void*)pool) != 0)
		{
			//free pool
			thread_pool_free(pool);
			return NULL;
		}
		pool->thread_count++;
		pool->started++;
	}
	return pool;
}

int thread_pool_post(thread_pool_t *pool, handler_pt func, void* arg)
{
	if (pool == NULL || func == NULL)
		return THREAD_POOL_NULL;

	task_queue_t *task_queue = &(pool->task_queue);//取出队列

	if (pthread_mutex_lock(&(pool->mutex)) != 0)
		return THREAD_MUTEX_FAIL;

	if (pool->closed)
	{
		pthread_mutex_unlock(&(pool->mutex));
		return THREAD_POOL_CLOSED;
	}

	if (task_queue->count==pool->queue_size)
	{
		pthread_mutex_unlock(&(pool->mutex));
		return THREAD_POOL_TASK_QUE_FULL;
	}

	task_queue->queue[task_queue->tail].func = func;
	task_queue->queue[task_queue->tail].arg = arg;
	task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
	task_queue->count++;

	if (pthread_cond_signal(&(pool->condition)) != 0)
	{
		pthread_mutex_unlock(&(pool->mutex));
		return THREAD_COND_SIGNAL_FAIL;
	}

	pthread_mutex_unlock(&(pool->mutex));
	return 0;
}

int wait_pool_done(thread_pool_t *pool)
{
	int i, ret = 0;
	for (i = 0; i < pool->thread_count; i++)
	{
		if (pthread_join(pool->threads[i], NULL) != 0)
			ret = 1;
	}
	return ret;
}

int thread_pool_destory(thread_pool_t *pool)
{
	if (pool == NULL)
		return THREAD_POOL_NULL;

	if (pthread_mutex_unlock(&(pool->mutex)) != 0)
		return THREAD_MUTEX_FAIL;

	if (pool->closed)
	{
		thread_pool_free(pool);
		return THREAD_POOL_CLOSED;
	}
	pool->closed = 1;

	if(pthread_cond_broadcast(&(pool->condition))!=0 ||
		pthread_mutex_unlock(&(pool->mutex)) != 0)
	{
		thread_pool_free(pool);
		return THREAD_COND_FAIL;
	}

	wait_pool_done(pool);
	thread_pool_free(pool);
	return 0;
}

main.c,使用线程池:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "thread_pool.h"

int num = 0;
int done = 0;
pthread_mutex_t lock;

void do_task(void *arg)
{
	usleep(3000);
	pthread_mutex_lock(&lock);
	done++;
	printf("doing %d task\n", done);
	pthread_mutex_unlock(&lock);
}

int main(int argc,char **argv)
{
	int threads = 4;
	int queue_size = 256;

	if (argc == 2)
	{
		queue_size=threads = atoi(argv[1]);
		if (threads <= 0 || queue_size <= 0)
		{
			printf("threads number or queue size error: %d,%d\n", threads, queue_size);
			return 1;
		}
	}

	thread_pool_t *pool = thread_pool_create(threads, queue_size);
	if (pool == NULL)
		return THREAD_POOL_NULL;

	while (thread_pool_post(pool, &do_task, NULL) == 0)
	{
		pthread_mutex_lock(&lock);
		num++;
		pthread_mutex_unlock(&lock);
	}

	printf("add %d tasks\n", num);

	wait_pool_done(pool);
	printf("did %d tasks\n", done);
	thread_pool_destory(pool);
	return 0;
}

四、哪些开源项目使用了线程池

4.1、nginx中的线程池

niginx github开源地址:https://github.com/nginx/nginx

nginx中线程池的作用是处理文件缓冲。 nginx线程池默认关闭,configure 时,需要 --with-threads
来指定。
线程池作用阶段:

(1)使用线程池的情况:nginx可以应用于静态web服务器,主要是处理文件缓冲,文件读写比较耗时。nginx推荐使用sendfile、directio、aio来处理耗时的任务,线程池不是重点推荐。nginx的耗时主要集中在文件操作,即compute阶段,这时可以使用线程池解决。

(2)使用线程池原因:磁盘IO读写比较耗时。nginx推荐使用sendfile、directio、aio来处理耗时的任务,线程池不是重点推荐。

(3)使用线程池:nginx线程池会有两个队列,任务消息队列和完成消息队列;任务消息队列存放发布的任务,将任务pull到线程池;线程池处理完会将结果push到完成消息队列,通知主线程获取结果。

static ngx_int_t
ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
	// 其他代码...
	task->event.data = r;
    task->event.handler = ngx_http_cache_thread_event_handler;
    
	if (ngx_thread_task_post(tp, task) != NGX_OK) {
        return NGX_ERROR;
    }
    // 其他代码...
}

调用逻辑:

4.2、redis中的线程池

redis是作为一个数据库,需要读写大量的数据、解析协议,这样一来读IO和写IO压力都非常大。因此,redis中线程池的作用是读写IO处理和数据包解析、压缩。
线程池作用阶段(recv、decode、encode、send):

redis使用线程池的条件:有多个客户端并发请求,并且有读写IO的问题(写日志业务、读大量数据等)。

redis线程池运行原理:

主线程收集所有的读事件,并放到一个队列中;线程池为每个线程都准备一个自己线程的队列;然后主线程将收集的事件分发到线程池IO线程的队列中,线程池的线程从自己的队列中取出任务、执行任务;主线程既是生产者也是消费者,主线程处理Compute阶段的业务逻辑。

每个线程都有自己的队列原因:避免加锁。

总结

(1)线程池,就是固定线程数量,复用线程不销毁。
(2)线程池是一种生产者–消费者模型,某类任务特别耗时,会严重影响该线程处理其他任务,因此需要线程池。
(3)线程池是面向生产者的,生产者使用线程。线程池最好至少设计两个队列,任务队列和完成队列。
(4)通常,线程池的线程调度使用互斥锁(mutex)和条件变量(condition)。但也不一定非得使用condition(条件变量),可以只有互斥锁,比如redis,主线程获取mutex,其他线程会一直阻塞着,巧妙的利用互斥锁使其他线程休眠。
(5)线程池的线程数量选择,依据业务是IO密集型还是CPU密集型;假设CPU核心数为N,IO密集型一般设置为2*N+2,CPU密集型设为N。可以参考这样一个公式:(IO处理时间+CPU运行时间)*核心数/CPU运行时间。
(6)线程池的作用:复用线程资源,充分利用系统资源,异步执行耗时任务。

关注公众号《Lion 莱恩呀》随时随地学习技术。
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。