ForkJoin线程池的学习和思考

举报
breakDawn 发表于 2021/11/08 16:51:11 2021/11/08
【摘要】 ForkJoin线程池在常规的java书籍里还是提到比较少的,毕竟是java8引入的产物。首先这里简单解释一下forkJoin的运作原理, 本质上有点像归并计算。他会将提交大任务按照一定规则拆解(fork)成多个小任务当任务小到一定程度时,就会执行计算执行完成时会和其他的小任务进行合并(join), 逐步将所有小结果合成一个大结果。可以看这个forkJoinTask的实现伪代码,即如果想使用...

ForkJoin线程池在常规的java书籍里还是提到比较少的,毕竟是java8引入的产物。

首先这里简单解释一下forkJoin的运作原理, 本质上有点像归并计算。

  1. 他会将提交大任务按照一定规则拆解(fork)成多个小任务
  2. 当任务小到一定程度时,就会执行计算
  3. 执行完成时会和其他的小任务进行合并(join), 逐步将所有小结果合成一个大结果。
    image.png

可以看这个forkJoinTask的实现伪代码,即如果想使用forkJoin并发执行任务,需要自己把任务继承RecursiveTask,作为forkJoin池的submit对象:


public class ForkJoinTask extends RecursiveTask<任务参数> {
 

	public ReckonTask(任务参数) {

	} 
 
	@Override
	protected File compute() {
		if(根据任务参数判断任务是否足够小) {
        计算,返回
      } else {
      	 拆分成子任务1和子任务2 
        任务1.fork();
        任务2.fork();
        结果1 = 任务1.join();
        结果2 = 任务2.join();
        返回结果1+结果2}
	}
}

然后实际上整个forkjoin的细节非常多,这里我通过给自己提好几个问题,来逐步理解forkJoin的原理,


Q: forkJoin中各个线程是如何获取那些小任务的呢?
A:
他是通过工作密取的方式获取。(java并发那本书里提到过工作密取workSteal,原来是用在这了)

  • 假设我们给forkJoin设置3个工作线程,那么就会有3个工作队列, 注意,这个队列是双端队列。
  • 每当执行任务时,如果不满足小任务的条件,他会fork出2个子任务,并push进自己的工作队列中。
  • 每个工作线程不断取自己队头的任务执行。
  • 关键点:如果自己队列里没有数据,则会从其他队列的队尾取数据。

Q: fork时具体发生了什么?
A:
是一个异步的操作, 就是向当前线程队列中添加这个fork出来任务,能放进去的话就返回,不会等待。
注意,默认fork出的任务是先默认给自己的。 当自己做不完时,才可能被别人取走!
image.png


Q: join是什么含义?什么时候做的?
A:
见实现forkJoin任务接口时的代码:
image.png

可以看到时每次fork完之后, 通过join,来获取子task的结果,获取到之后,再合并计算,返回结果。


Q: join这个阻塞过程是怎么做的?如果把线程挂起,那这个线程岂不是无法工作了?
A:

首先,之前fork时,新的子任务已经被放入队列了。
每个子任务都有一个任务状态。
当调用该子任务的join时, 会循环判断他的状态

如果这个子任务状态未完成, 则从自身队列或其他人的队列中取出新的任务执行,因此进入了下一层的exec()操作。
如果发现子任务状态更新为了完成(这个更新动作可能是自己线程完成的,也可能是别的线程完成的,反正这个任务的状态实现了同步和可见), 则将结果返回给上层。
因此join的本质是一个递归的过程, 任务没完成的话,他就取其他任务继续递归往下执行。

更详细的可以看这个链接fork+join过程详细解读


Q: forkJoin存放任务的时候,怎么保证不会出现并发问题?比如同时往队尾插入的话
A:

  • n个工作线程是通过数组存放的(即有一个工作线程数组)
  • sun.misc.Unsafe操作类直接基于操作系统控制层在硬件层面上进行原子操作,它是ForkJoinPool高效性能的一大保证,类似的编程思路还体现在java.util.concurrent包中相当规模的类功能实现中。
    image.png

Q: forkJoin应用在哪吗?
A:
java8 stream的parallel并发功能就是基于forkJoin做的, parallelStream实现的forkJoin拆解任务和执行任务的接口, 默认用机器所有CPU数量的forkJoin线程池。
如果需要限制线程数量,可以用
new forkJoin(线程数).submit(()->(list.stream().parallel().map()…)); 即可

关于java8和forkJoin究竟是如何配合的,可以看这个链接:
源码级别学习java8并行流执行原理
https://www.cnblogs.com/Dorae/p/7779246.html

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。