并发编程进阶-08
1.线程池的实现原理?
当提交一个新任务到线程池时,线程池的处理流程如下。
- 线程池判断核心线程池里的线程是否都在执行任务.如果不是,则创建一个新的工作线程来执行任务.如果核心线程池里的线程都在执行任务,则进入下个流程。
- 线程池判断工作队列是否已经满.如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
- 线程池判断线程池的线程是否都处于工作状态.如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
线程池创建线程时,会将线程封装成工作线程 Worker , Worker 在执行完任务后,还会循环获取工作队列里的任务来执行.我们可以从 Worker 类的 run()方法里看到这点。
ThreadPoolExecutor 执行 execute 方法分下面 4 种情况。
- 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁).
- 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
- 如果无法将任务加入 BlockingQueue (队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁).
- 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈).在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。
2.创建线程池的重要参数?
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//等待时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//等待队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) {} //拒绝策略
- 核心线程数
- 最大线程数
- 生存时间
- 时间单位
- 任务队列
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue.静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列.每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
- 线程工厂:可以自定义线程的名字,方便区分
- 拒绝策略
- AbortPolicy:直接抛出异常。是默认的策略.
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
细说参数:
-
核心线程数(Core Pool Size):
- 核心线程数是线程池中一直保持活动的线程数量,即使它们处于空闲状态。线程池会根据工作队列的任务数量自动调整活动线程的数量,但不会低于核心线程数。
-
最大线程数(Maximum Pool Size):
- 最大线程数是线程池中允许的最大线程数量。当工作队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。
-
任务队列(Work Queue):
- 任务队列用于保存等待执行的任务。当线程池的活动线程数达到核心线程数时,新的任务会被放入任务队列等待执行。任务队列可以是有界队列(如 ArrayBlockingQueue)或无界队列(如 LinkedBlockingQueue)。
-
线程存活时间(Keep Alive Time):
- 线程存活时间是当线程池中的线程数量
超过核心线程数时
,多余的空闲线程等待新任务的最长时间。如果超过这个时间仍然没有新任务到来,超过核心线程数的空闲线程将被终止。
- 线程存活时间是当线程池中的线程数量
-
TimeUnit (线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒).
-
拒绝策略(Rejected Execution Policy):
- 当线程池的任务队列已满且活动线程数已达到最大线程数时,新的任务将无法提交执行。拒绝策略定义了当线程池无法接受新任务时的处理方式,例如抛出异常、丢弃任务、或在调用者线程中直接执行任务。
3.谈谈 PriorityQueue 理解?
PriorityQueue
是优先级队列,通过自然排序或者用 java
的比较器实现自定义排序,无界队列,但是可以在创建时指定大小,不允许有空值,默认是最小堆,当排序相同时,随机返回一个,PriorityQueue
是非线程安全的,PriorityBlockingQueue
是线程安全的,用于多线程环境.PriorityBlockingQueue
实现原理是使用了可重入锁
private final ReentrantLock lock;
PriorityQueue
通过二叉小顶堆实现,任意一个非叶子节点的权值,都不大于其左右子节点的权值
-
大根堆也叫大顶堆
-
小根堆也叫小顶堆
-
top 问题时,求最小 k 个数用大根堆,因为大根堆根节点是最大的值,保存的都是小值
-
top 问题时,求最大 k 个数用小根堆,因为小根堆根节点是最小的值,保存的都是大值
方法 | 作用 | 失败处理方式 |
---|---|---|
add() | 插入元素 | 抛出异常 |
offer() | 插入元素 | 返回 false |
element() | 获取队首元素不删除 | 抛出异常 |
peek() | 获取队首元素不删除 | null |
remove() | 取出队首元素删除 | 抛出异常 |
poll() | 取出队首元素删除 | null |
经典方法源码:从k
指定的位置开始,将x
逐层与当前点的parent
进行比较并交换,直到满足x >= queue[parent]
为止
//siftUp()
private void siftUp(int k, E x){
while (k > 0){
int parent = (k -1)>>> 1;//parentNo = (nodeNo-1)/2
Object e = queue[parent];
if (comparator.compare(x,(E) e)>= 0)//调用比较器的比较方法
break;
queue[k]= e;
k = parent;
}
queue[k]= x;
}
该方法的作用是从k
指定的位置开始,将x
逐层向下与当前点的左右孩子中较小的那个交换,直到x
小于或等于左右孩子中的任何一个为止。
//siftDown()
private void siftDown(int k, E x){
int half = size >>> 1;
while (k < half){
//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标
int child = (k << 1)+1;//leftNo = parentNo*2+1
Object c = queue[child];
int right = child +1;
if (right < size &&
comparator.compare((E) c,(E) queue[right])> 0)
c = queue[child = right];
if (comparator.compare(x,(E) c)<= 0)
break;
queue[k]= c;//然后用c取代原来的值
k = child;
}
queue[k]= x;
}
4.execute 和 submit 的区别?
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
}
execute
和submit
都是用于向线程池提交任务的方法,但它们在使用方式和返回结果上有一些区别:
- 使用方式:
execute
:execute
方法是Executor
接口中定义的方法,它用于提交不需要返回结果的任务。该方法只接受Runnable
类型的任务,即没有返回值的任务。submit
:submit
方法是ExecutorService
接口中定义的方法,它用于提交既可以有返回结果也可以没有返回结果的任务。submit
方法可以接受Runnable
和Callable
类型的任务。Callable
是一个带有泛型返回值的任务类型,通过它可以获得任务执行的结果。
- 返回结果:
execute
:execute
方法没有返回结果,因为它只用于提交没有返回值的任务,所以无法获得任务的执行结果。submit
:submit
方法可以获得任务执行的结果。当使用submit
提交Callable
任务时,会返回一个Future
对象,通过这个对象可以异步获取任务执行的结果。
- 异常处理:
execute
:execute
方法不会抛出任务执行时的异常,因为没有返回结果,所以任务执行的异常只能由任务本身处理。submit
:submit
方法可以通过Future
对象来处理任务执行时的异常。调用Future
对象的get()
方法获取任务的执行结果时,如果任务抛出异常,get()
方法会将异常封装在ExecutionException
中并抛出。
总结:
- 如果你只关心任务的执行,不需要获取返回结果,可以使用
execute
方法。 - 如果你需要获取任务的执行结果或处理任务执行的异常,可以使用
submit
方法,并将任务封装为Callable
类型。
5.shutDown 和 shutDownNow
shutdown()
和shutdownNow()
都是用于关闭线程池的方法,但它们有一些区别:
shutdown()
方法:shutdown()
方法是ExecutorService
接口中定义的方法。- 调用
shutdown()
方法后,线程池会拒绝接受新的任务提交,但会继续执行已经提交的任务和队列中的任务。 shutdown()
方法会平缓地关闭线程池,它会等待所有已提交的任务执行完成,并且不会中断正在执行的任务。
shutdownNow()
方法:shutdownNow()
方法也是ExecutorService
接口中定义的方法。- 调用
shutdownNow()
方法后,线程池会立即停止接受新的任务提交,并且尝试中断正在执行的任务。 shutdownNow()
方法会尝试停止所有任务的执行,包括已经提交但未执行的任务,它会返回一个 List 集合,包含那些未执行的任务。
总结:
shutdown()
方法是平缓关闭线程池的方式,它会等待所有任务执行完成后关闭。shutdownNow()
方法是立即关闭线程池的方式,它会尝试中断正在执行的任务,并返回未执行的任务列表。
6.线程池监控?
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量小于或等于 taskCount。
- largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。
7.Executor 框架的结构?
Executor 框架主要由 3 大部分组成如下。
-
任务:
包括被执行任务需要实现的接口- Runnable 接口,Runnable 不会有返回结果
- Callable 接口,Callable 有返回结果
-
任务的执行。
-
包括任务执行机制的核心接口 Executor,
-
以及继承自 Executor 的 ExecutorService 接口。
-
Executor 框架有两个关键类实现了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)
-
-
异步计算的结果。
- 包括接口 Future
- 实现 Future 接口的 FutureTask 类。
核心类和接口:
- Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。
- ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor 是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。
ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。 - Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。
- Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 Scheduled-ThreadPoolExecutor 执行。
8.说说四种线程池?
主要通过各个线程池的特点和工作队列来进行说明.
ThreadPoolExecutor:
通常使用工厂类 Executors
来创建。 Executors
可以创建 3 种类型的 ThreadPoolExecutor
:
- SingleThreadExecutor
- 适用于保证顺序执行各个任务.
- FixedThreadPool
- 适用于需要限制当前线程数量的场景.比如负载比较重的服务器.
- CachedThreadPool
- 大小无界的线程池.适用于大量短期任务.或者负载比较轻的服务器.
FixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
核心线程数和最大线程数都是用户自己设置的 size
-
多余的线程会被立即终止,等待时间为 0
-
FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列
-
队列的容量为 Integer MAX_VALUE
-
线程池中的线程数不会超过核心线程数
-
最大线程数是个无效参数
-
保活时间是个无效参数
-
不会拒绝任务
SingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 核心线程数和最大线程数都是默认的 1
- 多余的线程会被立即终止,等待时间为 0
- FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列
- 队列的容量为 Integer MAX_VALUE。
CachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程数为 0
- 最大线程数无界
- 保活时间是 60S,等待新任务的最长时间是 60s,超过 60s 将被终止
- CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。
- 如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。
- 极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。
ScheduledThreadPoolExecuton:
ScheduledThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的 ScheduledThreadPoolExecutor,如下。
-
ScheduledThreadPoolExecutor。包含若干个线程的 ScheduledThreadPoolExecutor
-
SingleThreadScheduledExecutor。只包含一个线程的 ScheduledThreadPoolExecutor。
ScheduledFutureTask 主要包含 3 个成员变量,如下。
-
long 型成员变量 time,表示这个任务将要被执行的具体时间。
-
long 型成员变量 sequenceNumber,表示这个任务被添加到 ScheduledThreadPoolExecutor 中的序号。
-
long 型成员变量 period,表示任务执行的间隔周期。
ScheduledThreadPoolExecutor:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 使用 DelayedWorkQueue 作为队列
- DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 Scheduled-FutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
- 核心线程数为指定值
- 最大线程数为无界值
- 保活时间为 0
SingleThreadScheduledExecutor:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
- 核心线程数是 1
- 最大线程数无界值
- 保活时间为 0
- 使用 DelayedWorkQueue 队列
- 指定线程工厂,自定义线程名字前缀
9.不推荐 Executors 创建线程
推荐使用 ThreadPoolExecutor 方式创建线程,在阿里的 Java 开发手册时有一条是不推荐使用 Executors 去创建,而是推荐去使用 ThreadPoolExecutor 来创建线程池。
这样做的目的主要原因是:使用 Executors 创建线程池不会传入核心参数,而是采用的默认值,这样的话我们往往会忽略掉里面参数的含义,如果业务场景要求比较苛刻的话,存在资源耗尽的风险;另外采 ThreadPoolExecutor 的方式可以让我们更加清楚地了解线程池的运行规则,不管是面试还是对技术成长都有莫大的好处。
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 各个方法的弊端:
- newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool: 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
10.说一下 CTL?
ctl 变量是整个线程池的核心控制状态,它是一个 AtomicInteger 类型的原子对象,它记录了线程池中生效线程数和线程池的运行状态。
- workerCount,生效的线程数,基本上可以理解为存活的线程数。
- runState,线程池运行状态。
ctl 总共 32 位,其中低 29 位代表 workerCount,所以最大线程数为 2^29^-1。高 3 位代表 runState。
runState 有 5 个值:
- RUNNING: 对应的高 3 位值是 111。接收新任务处理队列任务。
- SHUTDOWN:对应的高 3 位值是 000。不接收新任务,但处理队列任务。
- STOP:对应的高 3 位值是 001。不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
- TIDYING: 对应的高 3 位值是 010。所有任务都被终结,有效线程为 0,并触发 terminated()方法。
- TERMINATED :对应的高 3 位值是 011。当 terminated()方法执行结束。
状态转换过程:
- 当调用了 shutdown(),状态会从 RUNNING 变成 SHUTDOWN,不再接收新任务,此时会处理完队列里面的任务。
- 如果调用的是 shutdownNow(),状态会直接变成 STOP。
- 当线程或者队列都是空的时候,状态就会变成 TIDYING。
- 当 terminated()执行完的时候,就会变成 TERMINATED。
11.线程是如何被回收的
ThreadPoolExecutor 回收工作线程,一条线程 getTask()返回 null,就会被回收。
分两种场景:
-
未调用 shutdown() :
RUNNING 状态下全部任务执行完成的场景,线程数量大于 corePoolSize,线程超时阻塞,超时唤醒后 CAS 减少工作线程数,如果 CAS 成功,返回 null,线程回收。否则进入下一次循环。当工作者线程数量小于等于 corePoolSize,就可以一直阻塞了。 -
调用 shutdown():
,全部任务执行完成的场景,shutdown() 会向所有线程发出中断信号,这时有两种可能。
所有线程都在阻塞:
中断唤醒,进入循环,都符合第一个 if 判断条件,都返回 null,所有线程回收。
任务还没有完全执行完:
至少会有一条线程被回收。在 processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用 tryTerminate(),向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。
12.创建多少线程合适?
我们从线程的应用场景来分析,由于 IO 操作比 Cpu 计算耗时要久的多的,如果我们一段程序有 IO 操作和 Cpu 计算,我们可以调用 IO 密集型计算。程序中没有 IO 操作只有 Cpu 的话称为 Cpu 密集型程序。
Cpu密集型:
Cpu 的核数=线程数就行,一般我们会设置 Cpu 核数+ 1,防止由于其他因素导致阻塞。
IO密集型:
确定在 IO 密集型计算中创建多少线程合适是一个复杂的问题,因为它涉及到多个因素,例如计算机的硬件配置、任务的性质和操作系统的特性。IO 密集型任务通常涉及大量的输入/输出操作,例如读写文件、网络通信等,而不是 CPU 密集型任务,这些任务主要涉及大量的计算。
在 IO 密集型任务中,线程通常会在等待 IO 操作完成时被阻塞,而不是在 CPU 上执行计算。因此,创建过多的线程可能会导致线程切换开销增加,从而导致性能下降。同时,创建过少的线程可能导致 CPU 资源得不到充分利用,从而造成性能浪费。
一般来说,建议的线程数量取决于以下几个因素:
- CPU 核心数:通常建议创建与 CPU 核心数相当数量的线程,这可以充分利用 CPU 资源,并避免过多的线程切换开销。
- IO 操作的类型和数量:如果 IO 操作非常耗时并且较多,可以考虑创建稍多于 CPU 核心数的线程,以便在等待 IO 时可以切换到其他线程执行任务。
- 内存:每个线程都需要一定的内存资源,过多的线程可能导致内存占用过大,影响系统的稳定性和性能。
- 操作系统的调度策略:不同的操作系统在线程调度方面有不同的策略,这也会影响合适的线程数量。
一种常见的做法是,首先根据 CPU 核心数来确定线程池的大小,然后根据实际的性能测试进行调优。可以逐渐增加线程数量,并监测系统性能的变化,找到最佳的线程数量。
值得注意的是,如果任务中有长时间的阻塞 IO 操作,也可以考虑使用异步 IO 或者事件驱动的编程模型,以减少线程数量并提高系统的吞吐量。
综上所述,IO 密集型任务的合适线程数量没有固定的标准,需要根据具体情况进行评估和调优。在实际应用中,可以进行性能测试和监测,找到最佳的线程数量来提高系统性能。
13.自定义线程池
- 600l 是 600,其中 l 是 long 的简写
- 使用阻塞队列,并设置最大容量为 1_000_000
- 自定义拒绝策略进行补偿机制处理
private ExecutorService service = new ThreadPoolExecutor(5,
5,
6001,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1_000_000),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 补偿机制
}
});
拒绝策略有4种:
- AbortPolicy(默认):
ThreadPoolExecutor.AbortPolicy()
是默认的拒绝策略。当线程池无法接受新任务时,会抛出RejectedExecutionException
异常。 - DiscardPolicy:
ThreadPoolExecutor.DiscardPolicy()
是另一种简单的拒绝策略。当线程池无法接受新任务时,新任务会被丢弃,不会抛出异常。 - DiscardOldestPolicy:
ThreadPoolExecutor.DiscardOldestPolicy()
是一种稍微高级一点的策略。当线程池无法接受新任务时,会丢弃队列中最老的任务,然后尝试重新提交新任务。 - CallerRunsPolicy:
ThreadPoolExecutor.CallerRunsPolicy()
。当线程池无法接受新任务时,它会将任务交给调用线程来执行。
- 点赞
- 收藏
- 关注作者
评论(0)