ForkJoin框架的分支合并计算

举报
多米诺的古牌 发表于 2021/09/28 17:13:24 2021/09/28
【摘要】 1.ForkJoin的定义ForkJoin是在JDK1.7提供的框架,ForkJoin的框架的基本思想是分而治之,并行执行,由大化小,即使用ForkJoin将相同的计算任务通过多线程的进行执行,从而能提高数据的计算速度(将一个大任务通过一个自定义的临界值分解成子任务,然后将子任务完成后会汇总结算,完成这个大任务,其中子任务还可以继续通过自定义的临界值再进行分解,分解后将所有分解执行结果汇总计...

1.ForkJoin的定义

ForkJoin是在JDK1.7提供的框架,ForkJoin的框架的基本思想是分而治之,并行执行,由大化小,即使用ForkJoin将相同的计算任务通过多线程的进行执行,从而能提高数据的计算速度(将一个大任务通过一个自定义的临界值分解成子任务,然后将子任务完成后会汇总结算,完成这个大任务,其中子任务还可以继续通过自定义的临界值再进行分解,分解后将所有分解执行结果汇总计算到子任务处,最后将所有子任务再汇总到一起计算)。

举个例子1:任务A分解成子任务A1和子任务A2,一起执行,执行结束后将A1和A2的结果汇总到一起。

举个例子2:任务A分解成子任务A1和子任务A2,一起执行,这时候发现子任务还是数据量还是有点大那就继续分解,将子任务A1分解为A11和A12,将子任务A2分解为A21和A22,此时是A11和A12和A21和A22一起执行,然后当A11和A12执行结束后会汇总结算将汇总结果归纳到子任务A1,然后当A21和A22执行结束后会汇总结算将汇总结果归纳到子任务A2,最后执行结束后将子任务A1和子任务A2的结果汇总到一起。

2.工作窃取

在ForkJoin任务执行的过程中,大任务分解成多个子任务的时候,每个子任务的处理时间都不一样,执行快的会将执行慢的任务窃取过来执行,从而提高效率(举个例子:将任务平均分给两个小伙子,一个小伙子很牛逼很快执行完了,而另一个小伙子还在吭哧吭哧的执行中,这时候第一个小伙子会将第二个小伙子的任务拿来执行,好人一生平安啊,整体任务的执行完成的时间就大大的提高了)。

3.ForkJoin的使用

使用ForkJoin框架,需要创建一个ForkJoin的任务,其中ForkJoin框架为我们提供了RecursiveAction和RecursiveTask两个抽象类。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现其中的compute抽象方法就可以完成创建。

注:RecursiveTask中的compute计算方法是有返回值的,而RecursiveAction中的compute计算方法是没有返回值的,源码如下图所示:

3.1 RecursiveAction中递归事件的compute计算方法

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

    /**
     * The main computation performed by this task.
     */
    protected abstract void compute();

3.2 RecursiveTask递归任务是需要结果的所以其中的compute计算方法要有返回值

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    /**
     * The result of the computation.
     */
    V result;

    /**
     * The main computation performed by this task.
     * @return the result of the computation
     */
    protected abstract V compute();

3.3 compute方法的定义ForkJoin的步骤

3.3.1 增加判断 

即通过临界值划分任务,小于某个数据量的时候按正常执行,大于某个数据量的时候通过ForkJoin的方式执行;

3.3.2 在大于某个数据量的判断中执行ForkJoin

3.3.2.1 设置中间值middle将所有数据分成两份,分而治之,并行执行;

3.3.2.2 创建子任务1(即定义的继承RecursiveTask抽象类并实现compute方法的类的对象)和子任务2(即定义的继承RecursiveTask抽象类并实现compute方法的类的对象)

3.3.2.3 拆分任务,将任务1和任务2通过.fork()方法分别压入线程队列;

3.3.24 将拆分的任务汇总合并,通过任务1.join()+任务2.join()方法进行汇总结算。

3.4 小案例的三种实现方式

3.4.1 小案例需求:求和1到1000000000的和。

3.4.2 基础平A

通过for循环进行求和;

public static void test1(){
    Long sum = 0L;
    Long start = System.currentTimeMillis();
    for (Long i = 1L; i <= 10_0000_0000L; i++) {
        sum += i;
    }
    Long end = System.currentTimeMillis();
    System.out.println("sum= "+ sum+",时间: "+(end - start));
}

结果是:sum= 500000000500000000,时间: 5728

3.4.3 ForkJoin分而治之

注:实现ForkJoin必须先创建一个ForkJoinPool池子,然后使用用池子来执行;

public static void test2() throws ExecutionException, InterruptedException {
    Long start = System.currentTimeMillis();
    //实现ForkJoin必须先创建一个ForkJoinPool池子,然后使用用池子来执行
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    //task本质是继承的RecursiveTask,而RecursiveTask本质是ForkJoinTask
    ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
    ForkJoinTask<Long> resultTask = forkJoinPool.submit(task);
    Long sum = resultTask.get();
    Long end = System.currentTimeMillis();
    System.out.println("sum= "+ sum+",时间: "+(end - start));
}

结果是:sum= 500000000500000000,时间: 4952

3.4.3.1 ForkJoinPool对象.submit异步提交是有结果返回值的;

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

3.4.3.2 ForkJoinPool对象. excute同步执行是没有结果的没有返回值的;

3.4.4 Stream并行流

涉及到的方法有:

3.4.4.1 range()方法是两边都不包含(),rangeClose()方法是包含后面(]
3.4.4.2 parallel()实行并行计算
3.4.4.3 reduce将结果拿出来

public static void test3(){
    Long start = System.currentTimeMillis();
    long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
    Long end = System.currentTimeMillis();
    System.out.println("sum= "+ sum+",时间: "+(end - start));
}

结果是:sum= 500000000500000000,时间: 157

public void execute(ForkJoinTask<?> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。