深入线程池原理
1 翻一翻ThreadPoolExecutor
1.1 类关系图
1.2 成员变量—线程池状态(生命周期)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的五种状态(生命周期):
-
RUNNING:正在运行,接受新任务和处理排队的任务
-
SHUTDOWN:准备关闭,不接受新任务,但处理排队的任务
-
STOP:停止,不接受新任务,不处理排队的任务,中断正在进行的任务
-
TIDYING:整理,所有的任务都已经终止,workerCount为0,转换到TIDYING状态的线程将运行terminated()钩子方法
-
TERMINATED:终止, terminated()已经完成这些值之间的数字顺序很重要,允许顺序比较。 runState随时间单调地增加,但不需要触及每个状态
1.3 四个构造方法
/**
* 使用默认ThreadFactory和Handler
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 使用默认Handler
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 使用默认ThreadFactory
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 真正执行的构造方法
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池的七大参数:
-
corePoolSize:核心线程数,核心池大小是保持活动的最小工作线程数(不允许超时等),除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。
-
maximumPoolSize:最大线程数,实际的最大值在内部是由CAPACITY限制的。
-
keepAliveTime:等待工作的空闲线程的超时(默认以纳秒计)。 当存在超过corePoolSize或allowCoreThreadTimeOut时,线程使用此超时。 否则他们将永远等待新的工作。
-
unit:等待工作的空闲线程的超时(默认以纳秒计)的单位。
-
workQueue:用于保存任务并将其传递给工作线程的队列。 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),所以只依赖isEmpty来查看队列是否为空(例如,当决定是否从SHUTDOWN转换到TIDYING时,我们必须这样做)。 这适用于特殊用途的队列,如DelayQueues,允许poll()返回null,即使稍后当延迟过期时它可能返回非null。
-
threadFactory:工厂的新线程。所有线程都是使用这个工厂创建的(通过方法addWorker)。所有调用者都必须做好addWorker失败的准备,这可能反映了系统或用户限制线程数量的策略。即使没有将其视为错误,创建线程失败也可能导致新任务被拒绝或现有任务仍卡在队列中。我们进一步保留池不变量,甚至在遇到OutOfMemoryError之类的错误时,这些错误可能在尝试创建线程时抛出。这类错误相当常见,因为需要在Thread中分配本机堆栈。启动时,用户将希望执行清理池关闭来清理。可能会有足够的内存可用来完成清理代码,而不会遇到另一个OutOfMemoryError。
-
handler:在执行中饱和或关闭时调用的处理程序。
1.4 execute方法
/**
* 在将来某个时候执行给定的任务。 任务可以在新线程中执行,也可以在现有的池线程中执行。
*
* 如果由于该执行器已关闭或已达到其容量,任务无法提交执行,则由当前处理
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 分三步进行:
* 1. 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程。 addWorker的调 * 用会自动检查runState和workerCount,从而通过返回false来防止在不应该添加线程时产生的错误警报。
* 2. 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查后,现有的 * 已经死亡)或进入此方法后池关闭。 所以我们重新检查状态,如有必要则回滚正在排队的if停止,或启动一个新 * 的线程,如果没有。
* 3. 如果不能对任务进行排队,则尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了所以拒绝这个 * 任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2 看一看线程池的底层原理
2.1 线程的状态(生命周期)转换
-
ctl:主线程池的状态,用AtomicInteger原子类表示,
-
RUNNING:初始化时默认为RUNNING状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
SHUTDOWN:shutdown()方法,线程池不能够接受新的任务,它会等待所有任务执行完毕;
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
-
STOP:shutdownNow()方法线程池不能接受新的任务,并且会去尝试终止正在执行的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
-
TIDYING:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
-
TERMINATED:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
2.2 线程池的阻塞队列
由此可见BlockingQueue是一个继承与Queue的接口,而他的具体实现最常用的无非一下几种
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
- DelayQueue:一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。不能将null元素放置到这种队列中。
- LinkedBlockingDuque:LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- LinkedTransferQueue:一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法。
- PriorityBlockingQueue:一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
2.3 线程池的执行过程
2.3.1 调用execute方法,参数为Runnable接口
用到了可重入锁和双重检查
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程
if (workerCountOf(c) < corePoolSize) {
// 调用addWorker
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果不能对任务进行排队,则尝试添加一个新的线程。如果失败,拒绝这个任务。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2.3.2 addWorker加入任务到线程池
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//查看状态
int c = ctl.get();
//查看正在运行的任务
int rs = runStateOf(c);
// 只在必要时检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//worker数量
int wc = workerCountOf(c);
//判断:核心线程判断是否超过核心线程数,非核心线程判断是否超过最大线程数,超过则返回FALSE
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//新任务是否启动
boolean workerStarted = false;
//新任务是否已添加
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 保持锁定时重新检查,在ThreadFactory失败时退出 ,在获得锁之前关闭
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//预先检查t是可启动的
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//添加任务异常
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.4 Worker的类结构
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* 这个类永远不会被序列化,但是我们提供了一个serialVersionUID来抑制javac警告。
*/
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
/**
* 使用给定的第一个任务和ThreadFactory中的线程创建。
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 在runWorker之前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 将主运行循环委托给外部的runWorker */
public void run() {
runWorker(this);
}
// 锁方法
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
2.5 线程池的拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
3 线程池的使用
[推荐文章](线程池的使用)
- 点赞
- 收藏
- 关注作者
评论(0)