线程池和计时器的实现

举报
2的n次方 发表于 2024/10/27 08:28:40 2024/10/27
【摘要】 1. 线程池在之前我们写的代码中,用到线程就创建,用完之后线程就消失了,这样会浪费操作系统的资源,也存在一些弊端,通过线程池就可以解决这个问题线程池是一种线程使用模式,它维护着多个线程,等待着监督管理者分配可并发执行的任务线程池的核心原理:创建一个空的线程池提交任务时,线程会创建新的线程对象,任务分配完毕,线程归还给线程池,下次再提交任务时,不需要创建新的线程,直接复用已有的线程即可如果提...

1. 线程池

在之前我们写的代码中,用到线程就创建,用完之后线程就消失了,这样会浪费操作系统的资源,也存在一些弊端,通过线程池就可以解决这个问题

线程池是一种线程使用模式,它维护着多个线程,等待着监督管理者分配可并发执行的任务

线程池的核心原理:

  1. 创建一个空的线程池
  2. 提交任务时,线程会创建新的线程对象,任务分配完毕,线程归还给线程池,下次再提交任务时,不需要创建新的线程,直接复用已有的线程即可
  3. 如果提交任务时,线程池中没有空闲线程,也无法创建新的线程,任务就会排队等待

执行的很多代码逻辑都是要用户态的代码和内核态的代码配合完成,而应用程序又有很多,这些应用程序都是由内核统一负责管理和服务,所以内核中的工作可能是非常繁忙的,也就导致了提交给内核的工作可能是不可控的,因此,通常认为,纯用户态操作,就比经过内核的操作效率更高。

Java标准库里也提供了线程池的实现类 ThreadPoolExecutor

下面是四种构造方法,

接下来介绍一下第四个构造方法中的参数都表示什么:

int corePoolSize : 核心线程数 (核心线程会始终存在于线程池内部,非核心线程,繁忙的时候被创建出来,空闲时就释放掉)

int maximumPoolSize 最大线程数【(核心线程数+非核心线程数)的最大值】

long keepAliveTime: 非核心线程允许空闲的最大时间(超过这个时间就会被释放)

TimeUnit unit: 非核心线程允许空闲的最大时间的单位

BlockingQueue < Runnable > workQueue 工作队列(线程池的工作过程就是一个典型的“生产者消费者模型”,这里的队列就可以指定容量和类型)

ThreadFactory threadFactory: 线程工厂(Thread类的工厂类,通过这个类,完成Thread的实例创建和初始化操作,此处的 threadFactory就可以针对线程池里的线程进行批量的设置属性),一般情况下使用 Executors.defaultThreadFactory()即可

RejectedExecutionHandler handler 拒绝策略

解释:核心线程就是指一直存在于线程池中的线程,两个空闲时间就是值,创建出来的临时线程空闲的时间,超过这个时间就意味着这靠核心线程就足以完成当前提交的任务,就需要销毁临时线程,节约资源,要执行的任务过多时的解决方案指的是,当前线程池中线程的数量已经达到了最大,并且阻塞队列也已经排满了,就需要把多出来的任务踢出去

下面是四种拒绝策略的描述:

不断地提交任务,会有以下三个临界点:

  1. 当核心线程满了之后,再提交任务就会排队

  2. 当核心线程和阻塞队列都满了之后,就会创建临时线程

  3. 当核心线程,阻塞队列,临时线程都满了之后,会触发任务的拒绝策略

1.1. 线程池的使用

Executors是一个工具类,对ThreadPoolExecutor进行了封装,提供了一些常用的方法:

先来演示一下:

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(4);
    for (int i = 0; i < 100; i++) {
        service.submit(()->{
            Thread current = Thread.currentThread();
            System.out.println(current.getName() + ": " + i);
        });
    }
}

此时发现 i 出现了错误,其实还是之前一直提到的变量捕获的问题,这里换种解决方法:

每次都创建一个局部变量来接收,就可以了

但是此时还有一个问题,运行程序之后发现程序并没有终止

是因为线程池创建出来的线程默认是“前台线程”,虽然main线程结束了,但是这些线程池的前台线程仍然是存在的,可以使用shutdown方法来把所有的线程终止掉,不过也不能立即终止,要确保所有的任务都执行完毕之后再终止,所以可以再添加一个sleep方法

public class ThreadDemo22 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 100; i++) {
            int id = i;
            service.submit(()->{
                Thread current = Thread.currentThread();
                System.out.println(current.getName() + ": " + id);
            });
        }
        //确保所有任务都执行完毕
        Thread.sleep(1000);
        //终止所有线程
        service.shutdown();
    }
}

1.2. 自定义线程池

根据线程池的特性,还可以模拟实现一个固定线程数目的线程池

先来简单的实现一下:

class MyThreadPoll{
    private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
    public MyThreadPoll(int n){
        //创建n个线程
        for(int i = 0;i < n;i++){
            Thread thread = new Thread(()->{
                while (true){
                    //循环从队列中取出任务
                    try {
                        Runnable runnable = queue.take();
                        runnable.run();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            thread.start();
        }
    }
    //提交任务
    public void submit(Runnable runnable){
        try {
            queue.put(runnable);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

上面的自定义线程池中,在构造方法里通过for循环来不断地创建线程,通过向阻塞队列中获取线程,并调用run方法执行任务,在提交任务时就是把任务加入到队列中

当然,上面的线程池是比较简陋的, 一些扩容,拒绝策略等功能还没有实现。

2. 定时器

2.1. 定时器的使用

定时器就相当于是一个闹钟,时间到了之后开始执行某个任务,Java中的定时器是TimerTask类和Timer类,TimerTash是一个抽象类,用于表示一个可以被定时器执行的任务,Timer类用于安排TimerTask在指定的时间执行,下面来演示一下:

public class ThreadDemo24 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello");
            }
        },2000);
        System.out.println("程序开始执行...");
    }
}

也可以多定几个任务:

public class ThreadDemo24 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello3");
            }
        },3000);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello2");
            }
        },2000);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello1");
            }
        },1000);
        System.out.println("程序开始执行...");
    }
}

2.2. 定时器的实现

模拟一个定时器的实现需要以下步骤:

  1. 创建类,描述一个要执行的任务是什么(任务的内容,任务的时间)
  2. 管理多个任务(通过一个合适的数据结构把任务存储起来)
  3. 有专门的线程执行任务

先来描述一下任务:

class MyTimerTask implements Comparable<MyTimerTask> {
    //任务
    private Runnable runnable;
    //时间,通过毫秒时间戳来表示这个任务具体什么时候执行
    private long time;

    public MyTimerTask(Runnable runnable, long dely) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + dely;
    }

    public void run() {
        runnable.run();
    }

    public long getTime() {
        return time;
    }

    @Override
    public int compareTo(MyTimerTask o) {
        return (int) (this.time - o.time);
    }
}

接下来就是管理任务

关于用什么数据结构来存储这些任务,可以来分析一下,如果说使用顺序表或者链表这样的结构可行吗,这样的话每次查找执行那个任务都需要遍历一遍,效率低下,所以说可以使用堆来存储,可以很好的找到最小值,只需要判断最小的那个任务是否到时间了即可

由于创建的堆是自定义类型,所以说MyTimeTash类还需要实现Comparable接口,重写compareTo方法

再来看MyTimer类:

class MyTimer {
    private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
    private Object lock = new Object();

    public MyTimer() {
        //创建线程,来负责执行任务
        Thread t = new Thread(() -> {
            while (true) {
                synchronized (lock) {
                    if (queue.isEmpty()) {
                        continue;
                    }
                    MyTimerTask current = queue.peek();
                    //需要执行任务的情况
                    if (System.currentTimeMillis() >= current.getTime()) {
                        current.run();
                        //执行过的任务需要从队列中删除
                        queue.poll();
                    } else {
                        continue;
                    }
                }
            }
        });
        //开启线程执行任务
        t.start();
    }

    public void schedule(Runnable runnable, long dely) {
        synchronized (lock) {
            MyTimerTask myTimerTask = new MyTimerTask(runnable, dely);
            queue.offer(myTimerTask);
        }
    }
}

创建线程之后,不断地从队列中取出任务来执行,执行完的任务要从队列中移除

同时,为了解决线程安全问题,还需要把出队和入队的操作都加上锁

但是,上面的代码还是存在几个潜在的问题的:

  1. 初始情况下,队列为空,由于continue的存在就会在循环中会反复地加锁释放锁,显然是不行的,所以可以采用等待唤醒机制来修改:

这里吧continue改为wait会更好一点

唤醒的时机就是每次添加任务的时候:

  1. 如果说当前时间是10:30,定的时间是12:00,那么期间一个半小时走的都是else分支,还是会不断地加锁

所以说这里也需要一个等待,不过可以不用像刚开始那样一直等,因为这时候肯定是知道要等多少时间的,直接采用有参版本就行

q : 如果使用 sleep 可以吗

a : 不可以

  1. wait可以被唤醒,如果提前加入了一个任务,就需要唤醒线程来执行任务,但是sleep不能唤醒
  2. sleep是抱着锁睡的,其他线程拿不到锁(1中任务也不能添加)

最后还有一个问题:关于为什么不使用PriorityBlockingQueue

由于PriorityBlockingQueue自身实现的方法本来就已经带锁了,这样就出现了锁的嵌套,就容易出现死锁的问题

下面是优化版的完整代码:

class MyTimerTask implements Comparable<MyTimerTask> {
    //任务
    private Runnable runnable;
    //时间,通过毫秒时间戳来表示这个任务具体什么时候执行
    private long time;

    public MyTimerTask(Runnable runnable, long dely) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + dely;
    }

    public void run() {
        runnable.run();
    }

    public long getTime() {
        return time;
    }

    @Override
    public int compareTo(MyTimerTask o) {
        return (int) (this.time - o.time);
    }
}

class MyTimer {
    private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();
    private Object lock = new Object();

    public MyTimer() {
        //创建线程,来负责执行任务
        Thread t = new Thread(() -> {
            while (true) {
                try {
                    synchronized (lock) {
                        if (queue.isEmpty()) {
                            wait();
                        }
                        MyTimerTask current = queue.peek();
                        //需要执行任务的情况
                        if (System.currentTimeMillis() >= current.getTime()) {
                            current.run();
                            //执行过的任务需要从队列中删除
                            queue.poll();
                        } else {
                            lock.wait(current.getTime() - System.currentTimeMillis());
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //开启线程执行任务
        t.start();
    }

    public void schedule(Runnable runnable, long dely) {
        synchronized (lock) {
            MyTimerTask myTimerTask = new MyTimerTask(runnable, dely);
            queue.offer(myTimerTask);
            lock.notify();
        }
    }
}

public class ThreadDemo25 {
    public static void main(String[] args) {
        MyTimer myTimer = new MyTimer();
        myTimer.schedule(() -> {
            System.out.println("hello1");
        }, 1000);
        myTimer.schedule(() -> {
            System.out.println("hello2");
        }, 2000);
        myTimer.schedule(() -> {
            System.out.println("hello3");
        }, 3000);
        System.out.println("开始执行");
    }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。