java并发包之ExecutorCompletionService、Exchanger、Phaser使用

举报
小米粒-biubiubiu 发表于 2021/05/28 16:32:22 2021/05/28
【摘要】 ExecutorCompletionService类 当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在...

ExecutorCompletionService类

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

  • 方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 


  
  1. static class Task implements Callable<String> {
  2. private int i;
  3. public Task(int i) {
  4. this.i = i;
  5. }
  6. @Override
  7. public String call() throws Exception {
  8. Thread.sleep(1000);
  9. return Thread.currentThread().getName() + "执行完任务" + i;
  10. }
  11. }

  
  1. private static int numThread = 5;
  2. //手动创建线程池
  3. ThreadPoolExecutor threadPoolExecutor =
  4. new ThreadPoolExecutor(5, 15, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(30));
  5. List<Future<String>> resultList = new ArrayList<>(5);
  6. for (int i = 1; i <= numThread; i++) {
  7. Future<String> future = threadPoolExecutor.submit(new Task(i));
  8. resultList.add(future);
  9. }
  10. while (numThread > 0) {
  11. if (!CollectionUtils.isEmpty(resultList)) {
  12. for (Future<String> future : resultList) {
  13. String result = null;
  14. try {
  15. result = future.get(0, TimeUnit.SECONDS);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } catch (ExecutionException e) {
  19. e.printStackTrace();
  20. } catch (TimeoutException e) {
  21. // 超时异常直接忽略
  22. }
  23. if (!StringUtils.isEmpty(result)) {
  24. resultList.remove(future);
  25. numThread--;
  26. System.out.println(result);
  27. // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
  28. break;
  29. }
  30. }
  31. }
  32. }
  • 方式二:

第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。


  
  1. //手动创建线程池
  2. ThreadPoolExecutor threadPoolExecutor =
  3. new ThreadPoolExecutor(5, 15, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(30));
  4. ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(threadPoolExecutor);
  5. for (int i = 1; i <= numThread; i++) {
  6. executorCompletionService.submit(new Task(i));
  7. }
  8. for(int i=1;i<=numThread;i++){
  9. try {
  10. System.out.println(executorCompletionService.take().get());
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } catch (ExecutionException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. threadPoolExecutor.shutdown();

 运行结果:

Exchanger类

Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。其定义为 Exchanger<V> 泛型类型,其中 V 表示可交换的数据类型,对外提供的接口很简单,具体如下:

  • Exchanger():无参构造方法。

  • V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

  • V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。

可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。


  
  1. static class Producer extends Thread {
  2. private Exchanger<Integer> exchanger;
  3. private static int data = 0;
  4. Producer(String name, Exchanger<Integer> exchanger) {
  5. super("Producer-" + name);
  6. this.exchanger = exchanger;
  7. }
  8. @Override
  9. public void run() {
  10. for (int i=1; i<5; i++) {
  11. try {
  12. TimeUnit.SECONDS.sleep(1);
  13. data = i;
  14. System.out.println(getName()+" 交换前:" + data);
  15. data = exchanger.exchange(data);
  16. System.out.println(getName()+" 交换后:" + data);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. }
  23. static class Consumer extends Thread {
  24. private Exchanger<Integer> exchanger;
  25. private static int data = 0;
  26. Consumer(String name, Exchanger<Integer> exchanger) {
  27. super("Consumer-" + name);
  28. this.exchanger = exchanger;
  29. }
  30. @Override
  31. public void run() {
  32. while (true) {
  33. data = 0;
  34. System.out.println(getName()+" 交换前:" + data);
  35. try {
  36. TimeUnit.SECONDS.sleep(1);
  37. data = exchanger.exchange(data);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. System.out.println(getName()+" 交换后:" + data);
  42. }
  43. }
  44. }

  
  1. public static void main(String[] args) throws InterruptedException {
  2. Exchanger<Integer> exchanger = new Exchanger<Integer>();
  3. new Producer("", exchanger).start();
  4. new Consumer("", exchanger).start();
  5. TimeUnit.SECONDS.sleep(7);
  6. System.exit(-1);
  7. }

运行结果: 

 Phaser 类

java多线程技术提供了Phaser工具类,Phaser表示“阶段器”,用来解决控制多个线程分阶段共同完成任务的情景问题。其作用相比CountDownLatch和CyclicBarrier更加灵活,例如有这样的一个题目:5个学生一起参加考试,一共有三道题,要求所有学生到齐才能开始考试,全部同学都做完第一题,学生才能继续做第二题,全部学生做完了第二题,才能做第三题,所有学生都做完的第三题,考试才结束。分析这个题目:这是一个多线程(5个学生)分阶段问题(考试考试、第一题做完、第二题做完、第三题做完),所以很适合用Phaser解决这个问题。


  
  1. package concurrent.phaser;
  2. import java.util.concurrent.Phaser;
  3. /**
  4. * 比赛阶段器
  5. */
  6. public class GamePhaser extends Phaser {
  7. /**
  8. * 当一个阶段的所有线程都到达时 , 执行该方法, 此时 phase自动加1
  9. * @param phase
  10. * @param registeredParties
  11. * @return
  12. */
  13. @Override
  14. protected boolean onAdvance(int phase, int registeredParties) {
  15. switch (phase) {
  16. case 0 :
  17. System.out.println("预赛完成");
  18. return false;
  19. case 1:
  20. System.out.println("初赛完成");
  21. return false;
  22. case 2:
  23. System.out.println("决赛完成");
  24. return false;
  25. default:
  26. return true;
  27. }
  28. }
  29. }

 


  
  1. package concurrent.phaser;
  2. import java.util.concurrent.Phaser;
  3. /**
  4. * 运动员类
  5. */
  6. public class Runner implements Runnable {
  7. private Phaser phaser;
  8. public Runner(Phaser phaser) {
  9. this.phaser = phaser;
  10. }
  11. @Override
  12. public void run() {
  13. /**
  14. * 参加预赛
  15. */
  16. System.out.println("选手-"+Thread.currentThread().getName()+":参加预赛");
  17. /**
  18. * 预赛阶段-----执行这个方法的话会等所有的选手都完成了之后再继续下面的方法
  19. */
  20. phaser.arriveAndAwaitAdvance();
  21. /**
  22. * 参加初赛
  23. */
  24. System.out.println("选手-"+Thread.currentThread().getName()+":参加初赛");
  25. /**
  26. * 初赛阶段-----执行这个方法的话会等所有的选手都完成了之后再继续下面的方法
  27. */
  28. phaser.arriveAndAwaitAdvance();
  29. /**
  30. * 参加决赛
  31. */
  32. System.out.println("选手-"+Thread.currentThread().getName()+":参加决赛");
  33. /**
  34. * 决赛阶段-----执行这个方法的话会等所有的选手都完成了之后再继续下面的方法
  35. */
  36. phaser.arriveAndAwaitAdvance();
  37. }
  38. }

 


  
  1. package concurrent.phaser;
  2. /**
  3. * 比赛开始
  4. */
  5. public class RunnerGame {
  6. public static void main(String[] args) {
  7. int runnerNum = 4;
  8. GamePhaser gamePhaser = new GamePhaser();
  9. /**
  10. * 注册一次表示phaser维护的线程个数
  11. */
  12. gamePhaser.register();
  13. for (int i = 0; i < runnerNum; i++ ) {
  14. /**
  15. * 注册一次表示phaser维护的线程个数
  16. */
  17. gamePhaser.register();
  18. new Thread(new Runner(gamePhaser)).start();
  19. }
  20. /**
  21. * 后续阶段主线程就不参加了
  22. */
  23. gamePhaser.arriveAndDeregister();
  24. }
  25. }

运行结果:

选手-Thread-0:参加预赛
选手-Thread-1:参加预赛
选手-Thread-3:参加预赛
选手-Thread-2:参加预赛
预赛完成
选手-Thread-2:参加初赛
选手-Thread-0:参加初赛
选手-Thread-3:参加初赛
选手-Thread-1:参加初赛
初赛完成
选手-Thread-1:参加决赛
选手-Thread-2:参加决赛
选手-Thread-0:参加决赛
选手-Thread-3:参加决赛
决赛完成

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/108995394

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。