线程池并发服务器

举报
xcc-2022 发表于 2022/10/24 21:08:01 2022/10/24
【摘要】 了解线程池并发服务器​ 首先什么是线程池?​ 线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。​ 为什么要有线程池?以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。之前在线程篇章中,我们也知道创建...

了解线程池并发服务器

​ 首先什么是线程池?

​ 线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。

​ 为什么要有线程池?

  1. 以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。

  2. 之前在线程篇章中,我们也知道创建线程并非多多益善,所以我们的思路是提前创建好若干个线程,不退出,等待任务的产生,去接收任务处理后等待下一个任务。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JkidoxsH-1666136169813)(C:\Users\xiao\AppData\Roaming\Typora\typora-user-images\image-20221002155451914.png)]

线程池如何实现?需要思考2个问题?

  1. 假设线程池创建了,线程们如何去协调接收任务并且处理?

  2. 线程池上的线程如何能够执行不同的请求任务?

上述问题1就很像我们之前学过的生产者和消费者模型,客户端对应生产者,服务器端这边的线程池对应消费者,需要借助一个互斥锁和两个条件变量循环队列来搞定。

问题2解决思路就是利用回调机制,我们同样可以借助结构体的方式,对任务进行封装,比如任务的数据和任务处理回调都封装在结构体上,这样线程池的工作线程拿到任务的同时,也知道该如何执行了。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gzpD0t31-1666136169814)(C:\Users\xiao\AppData\Roaming\Typora\typora-user-images\image-20221002095303510.png)]

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>


typedef struct _PoolTask
{
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;

typedef struct _ThreadPool
{
    int max_job_num;//最大任务个数
    int job_num;//实际任务个数
    PoolTask *tasks;//任务队列数组
    int job_push;//入队位置
    int job_pop;// 出队位置

    int thr_num;//线程池内线程个数
    pthread_t *threads;//线程池内线程数组
    int shutdown;//是否关闭线程池
    pthread_mutex_t pool_lock;//线程池的锁
    pthread_cond_t empty_task;//任务队列为空的条件(没任务)
    pthread_cond_t not_empty_task;//任务队列不为空的条件(有任务)

}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数

#endif
//简易版线程池
#include "threadpoolsimple.h"

ThreadPool *thrPool = NULL;

int beginnum = 1000;
//消费者
void *thrRun(void *arg)//取任务,做任务
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool*)arg;
    int taskpos = 0;//任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));//malloc一个任务队列,防止循环被覆盖

    while(1)
	{
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

		//无任务并且线程池不是要摧毁,直到生产者发送任务信号
        while(thrPool->job_num <= 0 && !thrPool->shutdown )
		{
			//如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
        }
        //有任务处理
        if(thrPool->job_num)
		{
            //有任务需要处理
            taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//取(0++)%20
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
			//为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));
            task->arg = task;//指向任务队列首地址
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if(thrPool->shutdown)
		{
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
			pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }
    
    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
    printf("begin call %s-----\n",__FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//malloc一个线程池

    thrPool->thr_num = thrnum;
    thrPool->max_job_num = maxtasknum;
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列(20)

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock,NULL);
    pthread_cond_init(&thrPool->empty_task,NULL);
    pthread_cond_init(&thrPool->not_empty_task,NULL);

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id号的空间
	
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);//设置线程分离
    for(i = 0;i < thrnum;i++)
	{
        pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//通知所以阻塞的线程信号  诱杀 

    /*int i = 0;
    for(i = 0; i < pool->thr_num ; i++)
	{
        pthread_join(pool->threads[i],NULL);
    }*/

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);//释放线程池
}
//生产者
//添加任务到线程池
void addtask(ThreadPool *pool)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

	//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while(pool->max_job_num <= pool->job_num)//20<=20
	{
        pthread_cond_wait(&pool->empty_task,&pool->pool_lock);//任务满了,通知消费者
    }

    int taskpos = (pool->job_push++)%pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;//任务编号
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知消费者
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void *arg)
{
    PoolTask *task = (PoolTask*)arg;
    int num = task->tasknum;
    printf("task %d is runing %lu\n",num,pthread_self());

    sleep(1);
    printf("task %d is done %lu\n",num,pthread_self());
}


int main()
{
    create_threadpool(3,20);//3个线程,任务最多20
    int i = 0;
    for(i = 0;i < 50 ; i++)
	{
        addtask(thrPool);//模拟添加任务
    }

    sleep(20);
    destroy_threadpool(thrPool);

    return 0;
}

epoll加线程池

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "sys/epoll.h"
#include "wrap.h"

typedef struct _PoolTask
{
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
    int fd;
    int epfd;
    struct epoll_event *evs;

}PoolTask ;

typedef struct _ThreadPool
{
    int max_job_num;//最大任务个数
    int job_num;//实际任务个数
    PoolTask *tasks;//任务队列数组
    int job_push;//入队位置
    int job_pop;// 出队位置

    int thr_num;//线程池内线程个数
    pthread_t *threads;//线程池内线程数组
    int shutdown;//是否关闭线程池
    pthread_mutex_t pool_lock;//线程池的锁
    pthread_cond_t empty_task;//任务队列为空的条件
    pthread_cond_t not_empty_task;//任务队列不为空的条件

}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
//void addtask(ThreadPool *pool);//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs);//
void taskRun(void *arg);//任务回调函数

#endif
#include "threadpoolsimple.h"

ThreadPool *thrPool = NULL;

int beginnum = 1000;

void *thrRun(void *arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool*)arg;
    int taskpos = 0;//任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));

    while(1)
	{
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

		//无任务并且线程池不是要摧毁
        while(thrPool->job_num <= 0 && !thrPool->shutdown )
		{
			//如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
        }
        
        if(thrPool->job_num)
		{
            //有任务需要处理
            taskpos = (thrPool->job_pop++)%thrPool->max_job_num;
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
			//为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));
            task->arg = task;
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if(thrPool->shutdown)
		{
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
			pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        printf("001\n");
        task->task_func(task->arg);//执行回调函数
        printf("002\n");
    }
    
    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
    printf("begin call %s-----\n",__FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));

    thrPool->thr_num = thrnum;
    thrPool->max_job_num = maxtasknum;
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock,NULL);
    pthread_cond_init(&thrPool->empty_task,NULL);
    pthread_cond_init(&thrPool->not_empty_task,NULL);

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间
	
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    for(i = 0;i < thrnum;i++)
	{
        pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀 

    int i = 0;
    for(i = 0; i < pool->thr_num ; i++)
	{
        pthread_join(pool->threads[i],NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

	//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while(pool->max_job_num <= pool->job_num)
	{
        pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
    }

    int taskpos = (pool->job_push++)%pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->tasks[taskpos].fd = fd;
     pool->tasks[taskpos].evs = evs;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知包身工
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void *arg)
{
    printf("003\n");
    PoolTask *task = (PoolTask*)arg;
     char buf[1024]="";
    int n = Read(task->fd , buf,sizeof(buf));
    if(n == 0 )
        {
         close(task->fd);//关闭cfd
        epoll_ctl(task->epfd,EPOLL_CTL_DEL,task->fd,task->evs);//将cfd上树
            printf("client close\n");
        }
    else if(n> 0)
        {
         printf("%s\n",buf );
         Write(task->fd ,buf,n);

        }
 printf("004\n");
   
}


int main()
{
    create_threadpool(3,20);
    int i = 0;
    //创建套接字,绑定
    int lfd = tcp4bind(8000,NULL);
    //监听
    listen(lfd,128);
    //创建树
    int epfd = epoll_create(1);
    struct epoll_event ev,evs[1024];
    ev.data.fd = lfd;
    ev.events = EPOLLIN;//监听读事件
    //将ev上树
    epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);
    while(1)
    {
        int nready = epoll_wait(epfd,evs,1024,-1);
        if(nready < 0)
            perr_exit("err");
        else if(nready == 0)
            continue;
        else if(nready > 0 )
        {
            for(int i=0;i<nready;i++)
            {
                if(evs[i].data.fd == lfd && evs[i].events & EPOLLIN)//如果是lfd变化,并且是读事件
                {
                        struct sockaddr_in cliaddr;
                        char buf_ip[16]="";
                        socklen_t len  = sizeof(cliaddr);
                        int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);
                        printf("client ip=%s port=%d\n",inet_ntop(AF_INET,
                        &cliaddr.sin_addr.s_addr,buf_ip,sizeof(buf_ip)),
                        ntohs(cliaddr.sin_port));
                        ev.data.fd = cfd;//cfd上树
                        ev.events = EPOLLIN;//监听读事件
                        epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);//将cfd上树

                }
                else if(evs[i].events & EPOLLIN)//普通的读事件
                {
                    printf("###########1\n");
                    addtask(thrPool,evs[i].data.fd,&evs[i]);
                     printf("###########2\n");
                    // char buf[1024]="";
                    // int n = Read(evs[i].data.fd , buf,sizeof(buf));
                    // if(n <= 0 )
                    // {
                    //     close(evs[i].data.fd);//关闭cfd
                    //     epoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);//将cfd上树
                    //     printf("client close\n");
                    // }
                    // else
                    // {
                    //     printf("%s\n",buf );
                    //     Write(evs[i].data.fd ,buf,n);

                    // }

                }
            }

        }
    }
    close(lfd);

   
    destroy_threadpool(thrPool);

    return 0;
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QMcpFjNz-1666136169816)(C:\Users\xiao\AppData\Roaming\Typora\typora-user-images\image-20221002224201153.png)]

推荐

华为开发者空间发布

让每位开发者拥有一台云主机

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。