【源码解析】Java线程池

举报
小明的混沌之路 发表于 2022/07/31 14:45:49 2022/07/31
【摘要】 什么是线程池?为什么要用线程池?

前言:📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫 

🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆

🔥 如果此文还不错的话,还请👍关注点赞收藏三连支持👍一下博主哦

本文导读

​什么是线程池?为什么要用线程池?

一、什么是线程池?为什么要用线程池?

        首先谈线程池要说下什么是线程池,线程池 是一种多线程处理方式,创建若干个可执行的线程放入一个容器(队列)中,从容器(队列)中获取线程不用自行创建,使用完毕不需要销毁线程而是放回容器(队列)中,从而减少创建和销毁线程对象的开销。

        使用线程池的好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。

二、开始谈JDK中的线程池【浅线程池使用、着重线程池工作机制】

背诵直接答,涉及线程池创建线程原理,关闭api的区别,原理可以详细聊一下,10min左右

        我们一般使用 ThreadPoolExecutor 创建线程池,提交任务 execute()方法和 submit()方法,execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,

【扩展】并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

        关闭线程池使用 shutdown() shutdownNow() ,原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt() 方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow() 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。shutdown() 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程,无返回。

        前面所说ThreadPoolExecutor 继承自Executor接口,Executor框架的大部分类都直接或间接地实现了此接口。该接口只有一个方法void execute(Runnable command): 意思是在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。

        线程池中创建线程并执行用的方法是 addWorker(Runnable firstTask, boolean core)方法。在执行addWorker的时候,worker 类会创建线程getThreadFactory().newThread(this),创建好线程以后,线程会启动,t.start() 实际调用的就是worker类中的 run() 方法,该方法的实质是运行runWorker() 方法,在执行该方法的时候就会从阻塞队列中获取任务,获取任务成功以后执行线程,完成任务即可。其中的 firstTask 用于指定新增线程执行的第一个任务,如果没有任务执行可以为null;boolean core 主要是判断当前线程池中活动的核心线程数是否达到最大,如果达到最大的话就创建非核心线程,该值为false,如果没有达到最大核心线程数量,则为true,创建核心线程。

三、开始谈合理配置线程池【经常爱问的——如何合理配置线程池】

背诵直接答,线程池的创建各个参数含义,自定义线程池的配置详细聊一下,10min左右

        我们创建线程池需要初始化线程池,这里面有几个重要的参数corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler

        第1个、corePoolSize 线程池中的核心线程数;当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

        我们的任务一般分为CPU密集型任务、IO密集型任务、混合型任务。CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,而对于混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。

        第2个、maximumPoolSize 线程池中允许的最大线程数,如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,但是当前线程数要小于maximumPoolSize。
        第3个、keepAliveTime  线程空闲时的存活时间,当线程没有任务执行时,继续存活的时间。
        第4个、 workQueue 用于保存等待执行的任务的阻塞队列,一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue

【扩展】这里面有个问题,使用无界队列作为工作队列会对线程池带来上的影响,当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超corePoolSize。同时 maximumPoolSize、keepAliveTime将是一个无效参数,可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

        第5个、 threadFactory 线程工厂,可以对线程做更多的设置;
        第6个、 RejectedExecutionHandler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略,一、AbortPolicy:直接抛出异常,默认策略;二、CallerRunsPolicy:用调用者所在的线程来执行任务;三、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;四、DiscardPolicy:直接丢弃任务;同时我们也可以自定义饱和策略。

四、jdk为我们定义的线程池—Executors五种线程池【说说你了解哪些线程池、使用场景】

背诵直接答,说说你了解哪些jdk自定义线程池、使用场景,3min左右

        jdk为我们定义了一些线程池,我常用的有,
        
FixedThreadPool,是我们常用的创建使用固定线程数的线程池,使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
        SingleThreadExecutor,创建使用单个线程的线程池,corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同。
        CachedThreadPool,是常用的无界线程池创建一个,根据需要创建新线程的线程池。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。
        ScheduledThreadPoolExecutor的含义是使用工厂类Executors来创建。使用场景比如,ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

【扩展】WorkStealingPool ,利用所有运行的处理器数目来创建一个工作窃取的线程池,使用forkjoin实现。

五、说你项目中实际应用的线程池【自定义线程池实现、工作中线程池实际的配置等等】

自定义线程池简单实现:

import tool.SleepTools;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * 自定义线程池实现
 * @author XiaoMing
 * @date 2021/9/12-10:06
 */
public class MyThreadPool {
    private static int WORK_COUNT = 5; /*缺省线程数据量*/
    private final BlockingQueue<Runnable> taskQueue; /*存放任务-阻塞队列*/
    /*工作线程*/
    private WorkThread[] workThreads; 
    private final int work_number;
    /*任务数,线程的数量*/
    public MyThreadPool(int task_count, int work_number) {
        if (work_number <= 0) work_number = WORK_COUNT;
        if (task_count <= 0) task_count = 100;
        this.taskQueue = new ArrayBlockingQueue<>(task_count);
        this.work_number = work_number;
        workThreads = new WorkThread[work_number];
        for (int i = 0; i < work_number; i++) { // 工作线程准备结束
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }
    /*销毁线程池*/
    public void destroy() {
        System.out.println("ready close pool....");
        for (int i = 0; i < work_number; i++) {
            workThreads[i].stopWorker(); // 中断线程
            workThreads[i] = null; // help gc
        }
        taskQueue.clear();
    }
    /*放入任务,但是只是加入队列*/
    public void execute(Runnable task) {
        try {
            // 将指定元素插入此队列中,将等待可用的空间.通俗点说就是>maxSize 时候,阻塞,直到能够有空间插入元素
            taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Override
    public String toString() {
        return "WorkThread number:" + work_number + " wait task number:" + taskQueue.size();
    }
    /*内部类,工作线程的实现*/
    private class WorkThread extends Thread {
        @Override
        public void run() {
            try { // 获取线程的中断状态,确认中断,跳出循环
                while (!isInterrupted()) { // 获取并移除此队列的头部,在元素变得可用之前一直等待。queue的长度==0的时候,一直阻塞
                    Runnable r = taskQueue.take();
                    if (r != null) {
                        System.out.println("===ready execute" + ((MyTask) r).getName());
                        r.run();
                    }
                }
            } catch (InterruptedException e) {
            }
        }
        /*停止工作*/
        public void stopWorker() {
            interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建3个线程的线程池
        MyThreadPool t = new MyThreadPool(0, 3);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy(); // 所有线程都执行完成才destory中断
        System.out.println(t);
    }

    // 任务类
    static class MyTask implements Runnable {
        private String name;
        private Random r = new Random();
        public MyTask(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }
        @Override
        public void run() { // 执行任务
            SleepTools.ms(r.nextInt(100) * 5);
            System.out.println("任务 " + name + " 完成");
        }
    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。