使用 Fork Join 框架

举报
kronchan 发表于 2021/01/23 23:58:30 2021/01/23
【摘要】 前言当需要执行大量的小任务的时候,我们需要将多个小任务进行拆分,类似 快速排序 的 分而治之。Fork 将一个大任务进行递归,不断的把它切割成符合条件的小任务,然后这些子任务分配到不同 CPU 核心上去分别计算,提高效率,Join 是 获取到小任务的计算结果,最后合并返回。它充分利用了现在多核 CPU 的性能。<!--more-->正文Fork/Join框架的核心类是ForkJoinPool...

前言

当需要执行大量的小任务的时候,我们需要将多个小任务进行拆分,类似 快速排序分而治之

Fork 将一个大任务进行递归,不断的把它切割成符合条件的小任务,然后这些子任务分配到不同 CPU 核心上去分别计算,提高效率,Join 是 获取到小任务的计算结果,最后合并返回。

它充分利用了现在多核 CPU 的性能。


<!--more-->

正文

Fork/Join框架的核心类是ForkJoinPool,它能够接收一个ForkJoinTask,并得到计算结果。ForkJoinTask有两个子类,RecursiveTask(有返回值)和RecursiveAction(无返回结果),我们自己定义任务时,只需选择这两个类继承即可。

RecursiveAction

 // 没有返回值的 fork / join 任务框架
 public class PrintTask extends RecursiveAction {
     private static final int THRESHOLD = 5;
     private int start;
     private int end;
 
     PrintTask(int start, int end) {
         this.start = start;
         this.end = end;
    }
 
     public static void main(String[] args) {
         PrintTask task = new PrintTask(0, 25);
         // 分配四个线程给它
         ForkJoinPool pool = new ForkJoinPool(4);
         pool.execute(task);
         pool.shutdown();
    }
 
     @Override
     protected void compute() {
         if (THRESHOLD >= (end - start)) {
             // 满足小任务条件,分配打印任务
             for (int i = start; i < end; i++) {
                 System.out.println(Thread.currentThread().getName() + ": " + i);
            }
        } else {
             // 任务还能继续拆分
             int division = (start + end) >> 1;
             PrintTask task1 = new PrintTask(start, division);
             PrintTask task2 = new PrintTask(division, end);
             invokeAll(task1, task2);
        }
    }
 }

RecursiveTask<T>

 // 有返回值的 fork / join 任务框架 RecursiveTask<T>
 public class SumTask extends RecursiveTask<Integer> {
     private static final int THRESHOLD = 50;
     private int start;
     private int end;
 
     public SumTask(int start, int end) {
         this.start = start;
         this.end = end;
    }
 
     public static void main(String[] args) throws InterruptedException {
         final int works = 200;
         SumTask task = new SumTask(0, works);
         ForkJoinPool forkJoinPool = new ForkJoinPool(4);
         long beginTime = System.currentTimeMillis();
         int result = forkJoinPool.invoke(task);
         long consumeTime = System.currentTimeMillis() - beginTime;
         System.out.println("Fork Join calculated the result is " + result + ",consume " + consumeTime + "ms");
         forkJoinPool.shutdown();
 
         result = 0;
         beginTime = System.currentTimeMillis();
         for (int i = 0; i < works; i++) {
             TimeUnit.SECONDS.sleep(1);
             result += i;
        }
         consumeTime = System.currentTimeMillis() - beginTime;
         System.out.println("The correct result is " + result + ",consume " + consumeTime + "ms");
    }
 
     @Override
     protected Integer compute() {
         int sum = 0;
         if (THRESHOLD >= (end - start)) {
             for (int i = start; i < end; i++) {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
                 sum += i;
            }
             System.out.println(Thread.currentThread().getName() + " result is :" + sum);
             return sum;
        } else {
             int division = (start + end) >> 1;
             SumTask task1 = new SumTask(start, division);
             SumTask task2 = new SumTask(division, end);
             invokeAll(task1, task2);
             int result1 = task1.join();
             int result2 = task2.join();
             return result1 + result2;
        }
    }
 }
  • 结果:

 ForkJoinPool-1-worker-1 result is :1225
 ForkJoinPool-1-worker-0 result is :8725
 ForkJoinPool-1-worker-3 result is :3725
 ForkJoinPool-1-worker-2 result is :6225
 Fork Join calculated the result is 19900,consume 50018ms
 The correct result is 19900,consume 200053ms

总结

  1. 我们虽然可以通过调整阈值 THRESHOLD 控制子任务的大小,从而控制了任务的数量,但是我们分配的 Fork/Join Pool 数量却是根据 CPU 性能而定的,所以,切割任务的大小和数量需要进行预先计算好,不是子任务越多就越好,而且合并结果集,需要消耗其他的计算性能。如果任务大小不能控制,可以设计可伸缩算法,动态来计算出合理的阈值,以符合要求。

  2. 注意正确的 Fork/Join 框架的写法,通过廖老师的文章 Java的Fork/Join任务,你写对了吗? ,指出了错误的写法的问题所在。

源码地址

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。