java 并发编程学习笔记(八)线程池

举报
小米粒-biubiubiu 发表于 2020/12/03 01:23:31 2020/12/03
【摘要】                                              线程池  (1)new Thread 弊端 每次new  Thred 新建对象,性能差线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者oom缺少更多功能,如更多执行,定期执行,线程中断 (2)线程池的好处 重在存在的线程,减少对象的创建,消亡的开...

                                             线程池 

(1)new Thread 弊端

  • 每次new  Thred 新建对象,性能差
  • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者oom
  • 缺少更多功能,如更多执行,定期执行,线程中断

(2)线程池的好处

  • 重在存在的线程,减少对象的创建,消亡的开销,性能差
  • 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
  • 提供定时执行,定期执行,单线程,并发数控制等功能

(3)线程池 ThreadPoolExecutor

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。

线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,从概念以及应用场景中,我们可以看出,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。那么,我们应该如何创建一个线程池呢?

Java中已经提供了创建线程池的一个类:Executor而我们创建时,一般使用它的子类:


  
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

 

  • execute (): 提交 任务,交给线程池执行
  • submit():提交任务,能够返回执行结果 execute+future
  • shutdown():关闭线程池,等待任务都执行完
  • shutdownNow (): 关闭线程池,不等待任务执行完
  • getTaskCount (): 线程池已经执行和未执行的任务总数
  • getCompletedTaskCount (): 已完成的任务数量
  • getPoolSize () : 线程池当前的线程数量
  • getActiveCount (): 当前线程池中正在执行任务的线程数量

这是其中最重要的一个构造方法,

这个方法决定了创建出来的线程池的各种属性,下面依靠一张图来更好的理解线程池和这几个参数:又图中,我们可以看出,

线程池中的corePoolSize就是线程池中的核心线程数量,这几个核心线程,只是在没有用的时候,也不会被回收。

maximumPoolSize就是线程池中可以容纳的最大线程的数量。

keepAliveTime,就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,而util,就是计算这个时间的一个单位。

workQueue,就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。

threadFactory,就是创建线程的线程工厂。

最后一个handler,是一种拒绝策略,我们可以在任务满了知乎,拒绝执行某些任务。

线程池的执行流程又是怎样的呢?

有图我们可以看出,任务进来时,首先执行判断,判断核心线程是否处于空闲状态,如果不是,核心线程就先就执行任务,如果核心线程已满,则判断任务队列是否有地方存放该任务,若果有,就将任务保存在任务队列中,等待执行,如果满了,在判断最大可容纳的线程数,如果没有超出这个数量,就开创非核心线程执行任务,如果超出了,就调用handler实现拒绝策略。

  • handler的拒绝策略有四种:

第一种AbortPolicy:不执行新任务,直接抛出异常,提示线程池已满


  
  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates an {@code AbortPolicy}.
  4. */
  5. public AbortPolicy() { }
  6. /**
  7. * Always throws RejectedExecutionException.
  8. *
  9. * @param r the runnable task requested to be executed
  10. * @param e the executor attempting to execute this task
  11. * @throws RejectedExecutionException always
  12. */
  13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14. throw new RejectedExecutionException("Task " + r.toString() +
  15. " rejected from " +
  16. e.toString());
  17. }
  18. }

第二种DisCardPolicy:不执行新任务,也不抛出异常


  
  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code DiscardPolicy}.
  4. */
  5. public DiscardPolicy() { }
  6. /**
  7. * Does nothing, which has the effect of discarding task r.
  8. *
  9. * @param r the runnable task requested to be executed
  10. * @param e the executor attempting to execute this task
  11. */
  12. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  13. }
  14. }

第三种DisCardOldSetPolicy:将消息队列中的第一个任务替换为当前新进来的任务执行    


  
  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code DiscardOldestPolicy} for the given executor.
  4. */
  5. public DiscardOldestPolicy() { }
  6. /**
  7. * Obtains and ignores the next task that the executor
  8. * would otherwise execute, if one is immediately available,
  9. * and then retries execution of task r, unless the executor
  10. * is shut down, in which case task r is instead discarded.
  11. *
  12. * @param r the runnable task requested to be executed
  13. * @param e the executor attempting to execute this task
  14. */
  15. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  16. if (!e.isShutdown()) {
  17. e.getQueue().poll(); //将队列头部的任务移除掉
  18. e.execute(r); //执行当前新进来的任务
  19. }
  20. }
  21. }

第四种CallerRunsPolicy:直接调用execute来执行当前任务


  
  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code CallerRunsPolicy}.
  4. */
  5. public CallerRunsPolicy() { }
  6. /**
  7. * Executes task r in the caller's thread, unless the executor
  8. * has been shut down, in which case the task is discarded.
  9. *
  10. * @param r the runnable task requested to be executed
  11. * @param e the executor attempting to execute this task
  12. */
  13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14. if (!e.isShutdown()) {
  15. r.run(); //直接执行 当前新进来的任务
  16. }
  17. }
  18. }
  • 四种常见的线程池:

CachedThreadPool:可缓存的线程池,该线程池中没有核心线程,非核心线程的数量为Integer.max_value,就是无限大,当有需要时创建线程来执行任务,没有需要时回收线程,适用于耗时少,任务量大的情况。

SecudleThreadPool:周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大。适用于执行周期性的任务。

SingleThreadPool:只有一条线程来执行任务,适用于有顺序的任务的应用场景。

FixedThreadPool:定长的线程池,有核心线程,核心线程的即为最大的线程数量,没有非核心线程


  
  1. /**
  2. * fixedThreadPool 正规的线程池,由于核心池数 等于最大 池数,因此没有最大线程池,
  3. * 只有最大线程池 的线程才能被回收,因为没有最大线程池,所以无超时机制,
  4. * 队列大小无限制,除非线程池关闭了核心线程才会被回收,
  5. * 采用默认的AbortPolicy:不执行新任务,直接抛出异常,提示线程池已满
  6. * return new ThreadPoolExecutor(nThreads, nThreads,
  7. * 0L, TimeUnit.MILLISECONDS,
  8. * new LinkedBlockingQueue<Runnable>());
  9. *
  10. */
  11. ExecutorService service = Executors.newFixedThreadPool(10);
  12. /**
  13. * newCachedThreadPool
  14. * return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  15. * 60L, TimeUnit.SECONDS,
  16. * new SynchronousQueue<Runnable>());
  17. *
  18. * 只有非核心线程,最大线程数很大(Integer.MAX_VALUE),它会为每一个任务添加一个新的线程,
  19. * 这边有一个超时机制,当最大线程池中空闲的线程超过60s内没有用到的话,就会被回收。
  20. * 缺点就是没有考虑到系统的实际内存大小。
  21. * 采用默认的AbortPolicy:不执行新任务,直接抛出异常,提示线程池已满
  22. */
  23. service = Executors.newCachedThreadPool();
  24. /**
  25. * return new FinalizableDelegatedExecutorService
  26. * (new ThreadPoolExecutor(1, 1,
  27. * 0L, TimeUnit.MILLISECONDS,
  28. * new LinkedBlockingQueue<Runnable>()));
  29. * 看这个名字就知道这个家伙是只有一个核心线程,就是一个孤家寡人,
  30. *通过指定的顺序将任务一个个丢到线程,都乖乖的排队等待执行,不处理并发的操作,不会被回收。确定就是一个人干活效率慢。
  31. */
  32. Executors.newSingleThreadExecutor();
  33. /**
  34. * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  35. * new DelayedWorkQueue());
  36. * 这个线程池就厉害了,是唯一一个有延迟执行和周期重复执行的线程池。
  37. * 它的核心线程池固定,非核心线程的数量没有限制,但是闲置时会立即会被回收。
  38. */
  39. Executors.newScheduledThreadPool(5);

(4)线程池的简单使用


  
  1. @Slf4j
  2. public class ThreadPoolExample1 {
  3. public static void main(String[] args) {
  4. ExecutorService service = Executors.newCachedThreadPool();
  5. service = Executors.newFixedThreadPool(3);
  6. service = Executors.newSingleThreadExecutor();
  7. for (int i = 0; i < 10; i++) {
  8. final int index = i;
  9. service.execute(() -> {
  10. log.info("task {}", index);
  11. });
  12. }
  13. service.shutdown();
  14. //Timer 组件
  15. Timer timer = new Timer();
  16. timer.schedule(new TimerTask() {
  17. @Override
  18. public void run() {
  19. log.info("Timer组件的定时任务");
  20. }
  21. }, new Date());
  22. timer.schedule(new TimerTask() {
  23. @Override
  24. public void run() {
  25. log.info("");
  26. }
  27. }, 3);
  28. timer.schedule(new TimerTask() {
  29. @Override
  30. public void run() {
  31. log.info("开始时间");
  32. log.info("喵喵喵喵喵喵喵喵");
  33. log.info("结束时间");
  34. }
  35. }, 3000, 3000);
  36. timer.scheduleAtFixedRate(new TimerTask() {
  37. @Override
  38. public void run() {
  39. log.info("开始时间");
  40. log.info("喵喵喵喵喵喵喵喵");
  41. log.info("结束时间");
  42. }
  43. }, 3000, 3000);
  44. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
  45. //3秒之后执行
  46. scheduledExecutorService.schedule(() -> {
  47. log.info("啦啦啦啦!");
  48. },
  49. 3, TimeUnit.SECONDS);
  50. //3秒之后执行,前一个线程的执行开始时间 和 后一个 线程的执行开始时间相差2秒
  51. /**
  52. * command:执行线程
  53. * initialDelay:初始化延时
  54. * period:两次开始执行最小间隔时间
  55. * unit:计时单位
  56. */
  57. scheduledExecutorService.scheduleAtFixedRate(() -> {
  58. log.info("开始时间");
  59. log.info("啦啦啦啦啦啦啦啦啦!");
  60. log.info("结束时间");
  61. }, 3, 2, TimeUnit.SECONDS);
  62. //3秒之后执行,前一个线程的结束时间 和 后一个 线程的 开始时间相差2秒
  63. /**
  64. * command:执行线程
  65. * initialDelay:初始化延时
  66. * period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
  67. * unit:计时单位
  68. */
  69. scheduledExecutorService.scheduleWithFixedDelay(() -> {
  70. log.info("开始时间");
  71. log.info("啦啦啦啦啦啦啦啦啦!");
  72. log.info("结束时间");
  73. }, 3, 5, TimeUnit.SECONDS);
  74. }
  75. }

(5)线程池 合理配置

cpu 密集型任务,就需要尽量压榨cpu,参考值可以设置为NCPU+1

Io 密集型任务,参考值可以设置为2*NCPU

I/O 密集型(主要是读写):指的是系统的CPU效能相对硬盘/内存的效能要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写,此时 CPU Loading 不高 
CPU 密集型(主要是运算): 指的是系统的 硬盘/内存 效能 相对 CPU 的效能 要好很多,此时,系统运作,大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O (硬盘/内存),I/O在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/84304276

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。