【从0到1学习Java线程池】Java线程池原理
这是【从0到1学习Java线程池】系列文章的第 贰 篇,该系列文章总共三篇,介绍了 Java 线程池的使用以及原理,并且最后会实现一个基本的线程池。本篇文章介绍了 Java 线程池的原理。
在上一篇文章中(【从0到1学习Java线程池】Java线程池的简介以及使用),我们总结了线程池的3个优点:
- 线程复用
- 控制最大并发数
- 管理线程
这篇文章会分别从这三个方面,结合具体的代码实现来剖析 Java 线程池的原理以及它的具体实现。
线程复用
我们知道线程池的一个作用是创建和销毁线程的次数,每个工作线程可以多次使用。这个功能就是线程复用。想要了解 Java 线程池是如何进行线程复用的,我们首先需要了解线程的生命周期。
线程生命周期
下图描述了线程完整的生命周期:

在一个线程完整的生命周期中,它可能经历五种状态:新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)、终止(Zombie)。
在 Java中,Thread 通过new来新建一个线程,这个过程是是初始化一些线程信息,如线程名、id、线程所属group等,可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器,同时将hasBeenStarted为true,之后如果再次调用start()方法就会有异常。
处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于 JVM 里线程调度器的调度。当线程获取CPU后,run()方法会被调用。不要自己去调用Thread的run()方法。之后根据CPU的调度,线程就会在就绪—运行—阻塞间切换,直到run()方法结束或其他方式停止线程,进入终止状态。
因此,如果要实现线程的复用,我们必须要保证线程池中的线程保持存活状态(就绪、运行、阻塞)。接下来,我们就来看看ThreadPoolExecutor是如何实现线程复用的。
Worker 类
ThreadPoolExecutor主要是通过一个类来控制线程复用的:Worker 类。
我们来看一下简化后的 Worker 类代码:
private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null){ task.run(); } } ……
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
从代码中,我们可以看到 Worker 实现了 Runnable 接口,并且它还有一个 Thread成员变量 thread,这个 thread 就是要开启运行的线程。我们看到 Worker 的构造方法中传递了一个 Runnable 参数,同时它把自己作为参数传入 newThread(),这样的话,当 Thread 的start()方法得到调用时,执行的其实是 Worker 的run()方法,即runWorker()方法。
runWorker()方法之中有一个 while 循环,使用 getTask()来获取任务,并执行。接下来,我们将会看到getTask()是如何获取到 Runnable 对象的。
getTask()
我们来看一下简化后的getTask()代码:
private Runnable getTask() {
if(一些特殊情况) { return null;
}
Runnable r = workQueue.take();
return r;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
我们可以看到任务是从 workQueue中获取的,这个 workQueue 就是我们初始化 ThreadPoolExecutor 时存放任务的 BlockingQueue队列,这个队列里的存放的都是将要执行的 Runnable任务。因为 BlockingQueue 是个阻塞队列,BlockingQueue.take()返回的是空,则进入等待状态直到 BlockingQueue 有新的对象被加入时唤醒阻塞的线程。所以一般情况下,Thread的run()方法不会结束,而是不断执行workQueue里的Runnable任务,这就达到了线程复用的目的了。
控制最大并发数
我们现在已经知道了 Java 线程池是如何做到线程复用的了,但是Runnable 是什么时候被放入 workQueue 队列中的呢,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?从上面的分析中我们可以看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?它又是如何做到控制最大并发数的呢?
execute()
通过查看 execute()就能解答上述的一些问题,同样是简化后的代码:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。 if (addWorker(command, true)) return; c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检验是否为RUNNING状态 // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command); // 采用线程池指定的策略拒绝任务 // 两种情况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false)) reject(command);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
addWorker()
我们再来看一下addWorker()的简化代码:
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) { return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
根据上面的代码,线程池工作过程中是如何添加任务的就很清晰了:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException
如果通过addWorker()成功创建新的线程,则通过start()开启新线程,同时将firstTask作为这个Worker里的run()中执行的第一个任务。虽然每个Worker的任务是串行处理,但如果创建了多个Worker,因为共用一个workQueue,所以就会并行处理了。所以可以根据corePoolSize和maximumPoolSize来控制最大并发数。
过程如下图所示:

一个例子
如果是做 Android 开发的,并且对 Handler 原理比较熟悉,你可能会觉得这个图挺熟悉,其中的一些过程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相当于execute(Runnuble),Looper中维护的Meaasge队列相当于BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。
管理线程
上边的文章已经讲了,通过线程池可以很好的管理线程的复用,控制并发数,以及销毁等过程,而线程的管理过程已经穿插在其中了,也很好理解。
在 ThreadPoolExecutor 有个AtomicInteger变量 ctl,这一个变量保存了两个内容:
- 所有线程的数量
- 每个线程所处的状态
其中低29位存线程数,高3位存runState,通过位运算来得到不同的值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到线程的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//得到Worker的的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 判断线程是否在运行
private static boolean isRunning(int c) { return c < SHUTDOWN; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
这里主要通过shutdown和shutdownNow()来分析线程池的关闭过程。首先线程池有五种状态来控制任务添加与执行。主要介绍以下三种:
- RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
- SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
- STOP状态:不再接受新任务,不处理队列中的任务
shutdown()这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程,而仍在工作的线程不受影响,所以队列中的任务人会被执行;shutdownNow()方法将runState置为STOP。和shutdown()方法的区别是,这个方法会终止所有的线程,所以队列中的任务也不会被执行了。
参考资料:http://www.kuqin.com/shuoit/20160829/352799.html
本文的版权归作者 罗远航 所有,采用 Attribution-NonCommercial 3.0 License。任何人可以进行转载、分享,但不可在未经允许的情况下用于商业用途;转载请注明出处。感谢配合!
文章来源: blog.csdn.net,作者:冰水比水冰,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/luoyhang003/article/details/58242166
- 点赞
- 收藏
- 关注作者
评论(0)