Java多线程 ThreadPoolExecutor-RejectedExecutionHandler拒绝执行策略

举报
Yeats_Liao 发表于 2022/10/18 23:19:39 2022/10/18
【摘要】 一、说明RejectedExecutionHandler当线程池已经被关闭,或者任务数超过maximumPoolSize+workQueue时执行拒绝策略ThreadPoolExecutor.AbortPolicy 默认拒绝策略,丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy 直接丢弃任务,但不抛出异常...

一、说明

RejectedExecutionHandler

  • 当线程池已经被关闭,或者任务数超过maximumPoolSize+workQueue时执行拒绝策略
  • ThreadPoolExecutor.AbortPolicy 默认拒绝策略,丢弃任务并抛出RejectedExecutionException异常
  • ThreadPoolExecutor.DiscardPolicy 直接丢弃任务,但不抛出异常
  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃任务队列最先加入的任务,再执行execute方法把新任务加入队列执行
  • ThreadPoolExecutor.CallerRunsPolicy:由创建了线程池的线程来执行被拒绝的任务

二、理解

AbortPolicy

默认拒绝策略,丢弃任务并抛出RejectedExecutionException异常

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    public static class AbortPolicy implements RejectedExecutionHandler {r
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

DiscardPolicy

直接丢弃任务,但不抛出异常

public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

DiscardOldestPolicy

丢弃队列中等待最久的任务,再把新任务添加进去执行,从任务队列弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

CallerRunsPolicy

会调用当前线程池的所在的线程去执行被拒绝的任务

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

三、实现

1.AbortPolicy

创建 ThreadPoolExecutorTest类,默认使用ThreadPoolExecutor.AbortPolicy拒绝策略,队列是ArrayBlockingQueue,设置核心线程数最大值为1,线程池线程数最大值为2,最大等待时间为5秒,等待队列值为2

public class RejectedExecutionHandlerTest {
    public static void main(String[] args) throws InterruptedException {
        // 1.创建自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());


        // 2.创建线程任务
        for (int i = 1; i <= 6; i++) {

            // 3.执行任务
            System.out.println("执行第"+i+"个任务");
            threadPoolExecutor.execute(new runnable("任务"+i));

            System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize());
            System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize());
            // 4.迭代器获取等待队列
            Iterator iterator = threadPoolExecutor.getQueue().iterator();
            System.out.print("当前等待队列 ");
            while (iterator.hasNext()){
                runnable thread = (runnable) iterator.next();
                System.out.print(thread.name + "\t");
            }
            System.out.print("\n");
            System.out.println("--------");
        }

        Thread.sleep(10000);
        System.out.println("----休眠10秒后----");
        System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize());
        System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize());
        System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());

        // 5.关闭线程池
        threadPoolExecutor.shutdown();
    }
        // 实现Runnable
    static class runnable implements Runnable{
        // 设置任务名
        String name;
        public runnable(String setName) {
            this.name = setName;
        }
        @Override
        public void run() {
            try {
                System.out.println("线程:"+Thread.currentThread().getName() +" 执行: "+name);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

当线程数达到corePoolSize后,若有新任务加入,则直接进入任务队列等待,超出队列的任务会创建新的线程来执行

一共有1个核心,当线程数超过corePoolSize+workQueue时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收
在这里插入图片描述

但如果再执行1个任务,线程数超过maximumPoolSize+workQueue,再提交任务将被丢弃并抛出RejectedExecutionException异常
在这里插入图片描述

2.DiscardPolicy

创建5个任务,让被线程池拒绝的任务直接丢弃,不会抛异常也不会执行

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

任务5不会执行,恶意不会抛出异常,超时的非核心线程将被回收

在这里插入图片描述

3.DiscardOldestPolicy

丢弃任务队列最先加入的任务,再执行execute方法把新任务加入队列执行

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

添加任务5时,线程数已经超过maximumPoolSize+workQueue,抛弃最先加入队列的任务2并且不执行,再将任务5加进队列中执行
在这里插入图片描述

4.CallerRunsPolicy

会调用当前线程池的所在的线程去执行被拒绝的任务

        // 1.创建自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

主线程执行任务1,空闲线程执行任务4,此时队列中有任务2和任务3

添加任务5时,线程数已经超过maximumPoolSize+workQueue,任务5直接调用当前线程池的所在的线程main去执行,这时主线程被阻塞了

当任务5执行完成时,最先的两个任务已经完成了,主线程去执行任务2和任务3,添加任务6也可以直接执行

在这里插入图片描述

超时的非核心线程将被回收
在这里插入图片描述

5.自定义拒绝执行策略

当线程数已经超过maximumPoolSize+workQueue时,调用新线程去执行任务

    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            new Thread(r, "新线程"+(new Random().nextInt(4) + 1)).start();
        }
    }
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new MyRejectedExecutionHandler());

在这里插入图片描述
在这里插入图片描述

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。