线程池和计时器的实现
1. 线程池
在之前我们写的代码中,用到线程就创建,用完之后线程就消失了,这样会浪费操作系统的资源,也存在一些弊端,通过线程池就可以解决这个问题
线程池是一种线程使用模式,它维护着多个线程,等待着监督管理者分配可并发执行的任务
线程池的核心原理:
- 创建一个空的线程池
- 提交任务时,线程会创建新的线程对象,任务分配完毕,线程归还给线程池,下次再提交任务时,不需要创建新的线程,直接复用已有的线程即可
- 如果提交任务时,线程池中没有空闲线程,也无法创建新的线程,任务就会排队等待
执行的很多代码逻辑都是要用户态的代码和内核态的代码配合完成,而应用程序又有很多,这些应用程序都是由内核统一负责管理和服务,所以内核中的工作可能是非常繁忙的,也就导致了提交给内核的工作可能是不可控的,因此,通常认为,纯用户态操作,就比经过内核的操作效率更高。
Java标准库里也提供了线程池的实现类 ThreadPoolExecutor
下面是四种构造方法,
接下来介绍一下第四个构造方法中的参数都表示什么:
int corePoolSize : 核心线程数 (核心线程会始终存在于线程池内部,非核心线程,繁忙的时候被创建出来,空闲时就释放掉)
int maximumPoolSize : 最大线程数【(核心线程数+非核心线程数)的最大值】
long keepAliveTime: 非核心线程允许空闲的最大时间(超过这个时间就会被释放)
TimeUnit unit: 非核心线程允许空闲的最大时间的单位
BlockingQueue < Runnable > workQueue : 工作队列(线程池的工作过程就是一个典型的“生产者消费者模型”,这里的队列就可以指定容量和类型)
ThreadFactory threadFactory: 线程工厂(Thread类的工厂类,通过这个类,完成Thread的实例创建和初始化操作,此处的 threadFactory就可以针对线程池里的线程进行批量的设置属性),一般情况下使用 Executors.defaultThreadFactory()即可
RejectedExecutionHandler handler : 拒绝策略
解释:核心线程就是指一直存在于线程池中的线程,两个空闲时间就是值,创建出来的临时线程空闲的时间,超过这个时间就意味着这靠核心线程就足以完成当前提交的任务,就需要销毁临时线程,节约资源,要执行的任务过多时的解决方案指的是,当前线程池中线程的数量已经达到了最大,并且阻塞队列也已经排满了,就需要把多出来的任务踢出去
下面是四种拒绝策略的描述:
不断地提交任务,会有以下三个临界点:
-
当核心线程满了之后,再提交任务就会排队
-
当核心线程和阻塞队列都满了之后,就会创建临时线程
-
当核心线程,阻塞队列,临时线程都满了之后,会触发任务的拒绝策略
1.1. 线程池的使用
Executors是一个工具类,对ThreadPoolExecutor进行了封装,提供了一些常用的方法:
先来演示一下:
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(4);
for (int i = 0; i < 100; i++) {
service.submit(()->{
Thread current = Thread.currentThread();
System.out.println(current.getName() + ": " + i);
});
}
}
此时发现 i 出现了错误,其实还是之前一直提到的变量捕获的问题,这里换种解决方法:
每次都创建一个局部变量来接收,就可以了
但是此时还有一个问题,运行程序之后发现程序并没有终止
是因为线程池创建出来的线程默认是“前台线程”,虽然main线程结束了,但是这些线程池的前台线程仍然是存在的,可以使用shutdown方法来把所有的线程终止掉,不过也不能立即终止,要确保所有的任务都执行完毕之后再终止,所以可以再添加一个sleep方法
public class ThreadDemo22 {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(4);
for (int i = 0; i < 100; i++) {
int id = i;
service.submit(()->{
Thread current = Thread.currentThread();
System.out.println(current.getName() + ": " + id);
});
}
//确保所有任务都执行完毕
Thread.sleep(1000);
//终止所有线程
service.shutdown();
}
}
1.2. 自定义线程池
根据线程池的特性,还可以模拟实现一个固定线程数目的线程池
先来简单的实现一下:
class MyThreadPoll{
private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
public MyThreadPoll(int n){
//创建n个线程
for(int i = 0;i < n;i++){
Thread thread = new Thread(()->{
while (true){
//循环从队列中取出任务
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.start();
}
}
//提交任务
public void submit(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
上面的自定义线程池中,在构造方法里通过for循环来不断地创建线程,通过向阻塞队列中获取线程,并调用run方法执行任务,在提交任务时就是把任务加入到队列中
当然,上面的线程池是比较简陋的, 一些扩容,拒绝策略等功能还没有实现。
2. 定时器
2.1. 定时器的使用
定时器就相当于是一个闹钟,时间到了之后开始执行某个任务,Java中的定时器是TimerTask类和Timer类,TimerTash是一个抽象类,用于表示一个可以被定时器执行的任务,Timer类用于安排TimerTask
在指定的时间执行,下面来演示一下:
public class ThreadDemo24 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello");
}
},2000);
System.out.println("程序开始执行...");
}
}
也可以多定几个任务:
public class ThreadDemo24 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello3");
}
},3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello1");
}
},1000);
System.out.println("程序开始执行...");
}
}
2.2. 定时器的实现
模拟一个定时器的实现需要以下步骤:
- 创建类,描述一个要执行的任务是什么(任务的内容,任务的时间)
- 管理多个任务(通过一个合适的数据结构把任务存储起来)
- 有专门的线程执行任务
先来描述一下任务:
class MyTimerTask implements Comparable<MyTimerTask> {
//任务
private Runnable runnable;
//时间,通过毫秒时间戳来表示这个任务具体什么时候执行
private long time;
public MyTimerTask(Runnable runnable, long dely) {
this.runnable = runnable;
this.time = System.currentTimeMillis() + dely;
}
public void run() {
runnable.run();
}
public long getTime() {
return time;
}
@Override
public int compareTo(MyTimerTask o) {
return (int) (this.time - o.time);
}
}
接下来就是管理任务
关于用什么数据结构来存储这些任务,可以来分析一下,如果说使用顺序表或者链表这样的结构可行吗,这样的话每次查找执行那个任务都需要遍历一遍,效率低下,所以说可以使用堆来存储,可以很好的找到最小值,只需要判断最小的那个任务是否到时间了即可
由于创建的堆是自定义类型,所以说MyTimeTash类还需要实现Comparable接口,重写compareTo方法
再来看MyTimer类:
class MyTimer {
private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
private Object lock = new Object();
public MyTimer() {
//创建线程,来负责执行任务
Thread t = new Thread(() -> {
while (true) {
synchronized (lock) {
if (queue.isEmpty()) {
continue;
}
MyTimerTask current = queue.peek();
//需要执行任务的情况
if (System.currentTimeMillis() >= current.getTime()) {
current.run();
//执行过的任务需要从队列中删除
queue.poll();
} else {
continue;
}
}
}
});
//开启线程执行任务
t.start();
}
public void schedule(Runnable runnable, long dely) {
synchronized (lock) {
MyTimerTask myTimerTask = new MyTimerTask(runnable, dely);
queue.offer(myTimerTask);
}
}
}
创建线程之后,不断地从队列中取出任务来执行,执行完的任务要从队列中移除
同时,为了解决线程安全问题,还需要把出队和入队的操作都加上锁
但是,上面的代码还是存在几个潜在的问题的:
- 初始情况下,队列为空,由于continue的存在就会在循环中会反复地加锁释放锁,显然是不行的,所以可以采用等待唤醒机制来修改:
这里吧continue改为wait会更好一点
唤醒的时机就是每次添加任务的时候:
- 如果说当前时间是10:30,定的时间是12:00,那么期间一个半小时走的都是else分支,还是会不断地加锁
所以说这里也需要一个等待,不过可以不用像刚开始那样一直等,因为这时候肯定是知道要等多少时间的,直接采用有参版本就行
q : 如果使用 sleep 可以吗
a : 不可以
- wait可以被唤醒,如果提前加入了一个任务,就需要唤醒线程来执行任务,但是sleep不能唤醒
- sleep是抱着锁睡的,其他线程拿不到锁(1中任务也不能添加)
最后还有一个问题:关于为什么不使用PriorityBlockingQueue
由于PriorityBlockingQueue自身实现的方法本来就已经带锁了,这样就出现了锁的嵌套,就容易出现死锁的问题
下面是优化版的完整代码:
class MyTimerTask implements Comparable<MyTimerTask> {
//任务
private Runnable runnable;
//时间,通过毫秒时间戳来表示这个任务具体什么时候执行
private long time;
public MyTimerTask(Runnable runnable, long dely) {
this.runnable = runnable;
this.time = System.currentTimeMillis() + dely;
}
public void run() {
runnable.run();
}
public long getTime() {
return time;
}
@Override
public int compareTo(MyTimerTask o) {
return (int) (this.time - o.time);
}
}
class MyTimer {
private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
private Object lock = new Object();
public MyTimer() {
//创建线程,来负责执行任务
Thread t = new Thread(() -> {
while (true) {
try {
synchronized (lock) {
if (queue.isEmpty()) {
wait();
}
MyTimerTask current = queue.peek();
//需要执行任务的情况
if (System.currentTimeMillis() >= current.getTime()) {
current.run();
//执行过的任务需要从队列中删除
queue.poll();
} else {
lock.wait(current.getTime() - System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//开启线程执行任务
t.start();
}
public void schedule(Runnable runnable, long dely) {
synchronized (lock) {
MyTimerTask myTimerTask = new MyTimerTask(runnable, dely);
queue.offer(myTimerTask);
lock.notify();
}
}
}
public class ThreadDemo25 {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(() -> {
System.out.println("hello1");
}, 1000);
myTimer.schedule(() -> {
System.out.println("hello2");
}, 2000);
myTimer.schedule(() -> {
System.out.println("hello3");
}, 3000);
System.out.println("开始执行");
}
}
- 点赞
- 收藏
- 关注作者
评论(0)