ForkJoin框架的分支合并计算
1.ForkJoin的定义
ForkJoin是在JDK1.7提供的框架,ForkJoin的框架的基本思想是分而治之,并行执行,由大化小,即使用ForkJoin将相同的计算任务通过多线程的进行执行,从而能提高数据的计算速度(将一个大任务通过一个自定义的临界值分解成子任务,然后将子任务完成后会汇总结算,完成这个大任务,其中子任务还可以继续通过自定义的临界值再进行分解,分解后将所有分解执行结果汇总计算到子任务处,最后将所有子任务再汇总到一起计算)。
举个例子1:任务A分解成子任务A1和子任务A2,一起执行,执行结束后将A1和A2的结果汇总到一起。
举个例子2:任务A分解成子任务A1和子任务A2,一起执行,这时候发现子任务还是数据量还是有点大那就继续分解,将子任务A1分解为A11和A12,将子任务A2分解为A21和A22,此时是A11和A12和A21和A22一起执行,然后当A11和A12执行结束后会汇总结算将汇总结果归纳到子任务A1,然后当A21和A22执行结束后会汇总结算将汇总结果归纳到子任务A2,最后执行结束后将子任务A1和子任务A2的结果汇总到一起。
2.工作窃取
在ForkJoin任务执行的过程中,大任务分解成多个子任务的时候,每个子任务的处理时间都不一样,执行快的会将执行慢的任务窃取过来执行,从而提高效率(举个例子:将任务平均分给两个小伙子,一个小伙子很牛逼很快执行完了,而另一个小伙子还在吭哧吭哧的执行中,这时候第一个小伙子会将第二个小伙子的任务拿来执行,好人一生平安啊,整体任务的执行完成的时间就大大的提高了)。
3.ForkJoin的使用
使用ForkJoin框架,需要创建一个ForkJoin的任务,其中ForkJoin框架为我们提供了RecursiveAction和RecursiveTask两个抽象类。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现其中的compute抽象方法就可以完成创建。
注:RecursiveTask中的compute计算方法是有返回值的,而RecursiveAction中的compute计算方法是没有返回值的,源码如下图所示:
3.1 RecursiveAction中递归事件的compute计算方法
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
3.2 RecursiveTask递归任务是需要结果的所以其中的compute计算方法要有返回值
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();
3.3 compute方法的定义ForkJoin的步骤
3.3.1 增加判断
即通过临界值划分任务,小于某个数据量的时候按正常执行,大于某个数据量的时候通过ForkJoin的方式执行;
3.3.2 在大于某个数据量的判断中执行ForkJoin
3.3.2.1 设置中间值middle将所有数据分成两份,分而治之,并行执行;
3.3.2.2 创建子任务1(即定义的继承RecursiveTask抽象类并实现compute方法的类的对象)和子任务2(即定义的继承RecursiveTask抽象类并实现compute方法的类的对象)
3.3.2.3 拆分任务,将任务1和任务2通过.fork()方法分别压入线程队列;
3.3.24 将拆分的任务汇总合并,通过任务1.join()+任务2.join()方法进行汇总结算。
3.4 小案例的三种实现方式
3.4.1 小案例需求:求和1到1000000000的和。
3.4.2 基础平A
通过for循环进行求和;
public static void test1(){
Long sum = 0L;
Long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
Long end = System.currentTimeMillis();
System.out.println("sum= "+ sum+",时间: "+(end - start));
}
结果是:sum= 500000000500000000,时间: 5728
3.4.3 ForkJoin分而治之
注:实现ForkJoin必须先创建一个ForkJoinPool池子,然后使用用池子来执行;
public static void test2() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
//实现ForkJoin必须先创建一个ForkJoinPool池子,然后使用用池子来执行
ForkJoinPool forkJoinPool = new ForkJoinPool();
//task本质是继承的RecursiveTask,而RecursiveTask本质是ForkJoinTask
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> resultTask = forkJoinPool.submit(task);
Long sum = resultTask.get();
Long end = System.currentTimeMillis();
System.out.println("sum= "+ sum+",时间: "+(end - start));
}
结果是:sum= 500000000500000000,时间: 4952
3.4.3.1 ForkJoinPool对象.submit异步提交是有结果返回值的;
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
3.4.3.2 ForkJoinPool对象. excute同步执行是没有结果的没有返回值的;
3.4.4 Stream并行流
涉及到的方法有:
3.4.4.1 range()方法是两边都不包含(),rangeClose()方法是包含后面(]
3.4.4.2 parallel()实行并行计算
3.4.4.3 reduce将结果拿出来
public static void test3(){
Long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
Long end = System.currentTimeMillis();
System.out.println("sum= "+ sum+",时间: "+(end - start));
}
结果是:sum= 500000000500000000,时间: 157
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
- 点赞
- 收藏
- 关注作者
评论(0)