Java多线程 ThreadPoolExecutor自定义线程池
一、说明
ThreadPoolExecutor
- Java提供的线程池Executor框架相关的工具类中,最核心的是ThreadPoolExecutor
- 它有多个构造方法来实现自定义创建线程池,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等
二、理解
ThreadPoolExecutor
java.util.cocurrent
包下ThreadPoolExecutor
类继承AbstractExecutorService
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
核心线程数,默认为0,有任务时将创建线程去执行,当线程数达到corePoolSize时,停止线程创建,将任务放到队列中等待;调用prestartAllCoreThreads()
或prestartCoreThread()
方法,可以预创建线程maximumPoolSize
线程池线程数,当线程数达到核corePoolSize时,如果任务队列已满,线程池会创建新的线程,直到线程数量达到最maxPoolSizekeepAliveTime
线程池中超过corePoolSize的空闲线程最大存活时间,超时则销毁,直到线程数量等于corePoolSizeTimeUnit
keepAliveTime时间单位workQueue
阻塞任务队列,用来存储等待执行的任务threadFactory
线程工厂,可以更改线程的名称,线程组,优先级,守护进程状态等RejectedExecutionHandler
拒绝执行策略,当提交任务数超过maximumPoolSize+workQueue时,再提交任务的会交给拒绝策略去处理
workQueue
- 当线程池任务线程数量 < corePoolSize,执行器会创建一个新的线程来执行新添加的任务
- 当线程池任务线程数量 > corePoolSize,且workQueue未满时,执行器将新添加的任务放到workQueue中
- 当线程池任务线程数量 > corePoolSize,且workQueue已满时,行器会创建一个新的线程来执行新添加的任务,直到超过maximumPoolSize执行拒绝策略
队列策略
-
Direct handoffs
直接握手队列,使用SynchronousQueue队列,提交的任务会马上执行,不会被保存,如果执行任务线程数小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize,则执行拒绝策略 -
Bounded queues
有界队列,一般使用ArrayBlockingQueue制定队列的最大长度,当创建线程数达到corePoolSize时,若有新任务加入,则直接进入任务队列等待,若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数达到maximumPoolSize,则执行拒绝策略 -
Unbounded queues
无界队列,一般使用无预定长度的LinkedBlockingQueue,当线程数达到corePoolSize后,若有新任务加入,则直接进入任务队列等待,任务队列可以无限添加新的任务,maximumPoolSize参数是无效的
RejectedExecutionHandler
- 当线程池已经被关闭,或者任务数超过maximumPoolSize+workQueue时执行拒绝策略
ThreadPoolExecutor.AbortPolicy
默认拒绝策略,丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy
直接丢弃任务,但不抛出异常ThreadPoolExecutor.DiscardOldestPolicy
丢弃任务队列最先加入的任务,再执行execute方法把新任务加入队列执行ThreadPoolExecutor.CallerRunsPolicy
会调用当前线程池的所在的线程去执行被拒绝的任务
线程池任务的执行流程
三、实现
1.SynchronousQueue
创建 ThreadPoolExecutorTest
类,默认使用ThreadPoolExecutor.AbortPolicy
拒绝策略,队列是SynchronousQueue
,超出核心线程的任务会创建新的线程来执行,设置核心线程数最大值为4,线程池线程数最大值为8,最大等待时间为5秒
public class ThreadPoolExecutorTest {
public static void main(String[] args) throws InterruptedException {
// 1.创建自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 2.创建线程任务
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 3.执行任务
System.out.println("设置核心线程数最大值为4,线程池线程数最大值为8");
System.out.println("&&&&开始执行线程&&&&");
System.out.println("----执行4个任务----");
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize());
System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize());
System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
System.out.println("----再执行4个任务----");
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize());
System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize());
System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
Thread.sleep(10000);
System.out.println("----休眠10秒后----");
System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize());
System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize());
System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
// 4.关闭线程池
threadPoolExecutor.shutdown();
}
}
一共有4个核心,超过核心线程将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收
但如果再执行4个任务,线程数超过maximumPoolSize
,再提交任务将被丢弃并抛出RejectedExecutionException
异常
2.ArrayBlockingQueue
当线程数达到corePoolSize
后,若有新任务加入,则直接进入任务队列等待,超出队列的任务会创建新的线程来执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
一共有4个核心,当线程数超过corePoolSize+workQueue
时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收
但如果再执行4个任务,线程数超过maximumPoolSize+workQueue
,再提交任务将被丢弃并抛出RejectedExecutionException
异常
3.LinkedBlockingQueue
当线程数达到corePoolSize
后,若有新任务加入,则直接进入任务队列等待,任务队列可以无限添加新的任务,maximumPoolSize
参数是无效的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
一共有4个核心,超过核心线程将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收
但当设置LinkedBlockingDeque
容量限制为4时,当线程数超过corePoolSize+workQueue
时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
但如果再执行4个任务,线程数超过maximumPoolSize+workQueue
,再提交任务将被丢弃并抛出RejectedExecutionException
异常
- 点赞
- 收藏
- 关注作者
评论(0)