如何在Java中使用ForkJoinPool

举报
千锋教育 发表于 2023/07/20 16:09:57 2023/07/20
【摘要】 使用 ForkJoinPool 分解计算密集型任务并并行执行它们,以获得更好的 Java 应用程序性能。ForkJoinPool是一个功能强大的 Java 类,用于处理计算密集型任务。它的工作原理是将任务分解为更小的子任务,然后并行执行它们。该线程池使用分治策略进行操作,这使其能够同时执行任务,从而提高吞吐量并减少处理时间。其独特功能之一ForkJoinPool是它用于优化性能的工作窃取算法...

使用 ForkJoinPool 分解计算密集型任务并并行执行它们,以获得更好的 Java 应用程序性能。

ForkJoinPool是一个功能强大的 Java 类,用于处理计算密集型任务。它的工作原理是将任务分解为更小的子任务,然后并行执行它们。该线程池使用分治策略进行操作,这使其能够同时执行任务,从而提高吞吐量并减少处理时间。

其独特功能之一ForkJoinPool是它用于优化性能的工作窃取算法。当工作线程完成分配的任务时,它会从其他线程窃取任务,确保所有线程高效工作,不会浪费计算机资源。

ForkJoinPool广泛用于 Java 的并行流和CompletableFutures中,允许开发人员轻松地并发执行任务。此外, KotlinAkka等其他 JVM 语言也使用此框架来构建需要高并发性和弹性的消息驱动应用程序。

使用 ForkJoinPool 进行线程池

该类ForkJoinPool存储了workers,它们是在机器的每个CPU核心上运行的进程。每个进程都存储在deque中,它代表双端队列。一旦工作线程用完任务,它就会开始从其他工作线程窃取任务。

首先会有任务分叉的过程;这意味着一个大任务将被分解为可以并行执行的较小任务。一旦所有子任务完成,它们就会重新加入。然后该类ForkJoinPool会提供一个结果,如图 1 所示。

Java 中 ForkJoinPool 的示意图。国际数据集团

图 1. 运行中的 ForkJoinPool

当任务在a中提交时ForkJoinPool,进程将被分成更小的进程并推送到共享队列中。

一旦fork()调用该方法,就会并行调用任务,直到基本条件为真。一旦处理被分叉,该join()方法就会确保线程相互等待,直到进程完成。


所有任务最初都会提交到主队列,该主队列将任务推送到工作线程。请注意,任务是使用 LIFO(后进先出)策略插入的,该策略与堆栈数据结构相同。

另一个重要的一点是ForkJoinPool使用双端队列来存储任务。这使得能够使用 LIFO 或 FIFO(先进先出),这对于工作窃取算法是必需的。

ForkJoinPool 双端队列图国际数据集团

图 2. ForkJoinPool 使用 Deques 存储任务

工作窃取算法

工作窃取ForkJoinPool是一种有效的算法,它通过平衡池中所有可用线程的工作负载来有效利用计算机资源。

当线程变得空闲时,它将尝试从仍忙于分配的工作的其他线程中窃取任务,而不是保持不活动状态。此过程最大限度地提高了计算资源的利用率,并确保没有线程负担过重而其他线程保持空闲。

工作窃取算法背后的关键概念是每个线程都有自己的任务双队列,并按 LIFO 顺序执行。

当一个线程完成自己的任务并变得空闲时,它会尝试从另一个线程的双端队列末尾“窃取”任务,遵循先进先出的策略,与队列数据结构相同。这使得空闲线程能够拾取等待时间最长的任务,从而减少总体等待时间并增加吞吐量。

在下图中,线程 2 通过轮询线程 1 双端队列的最后一个元素,从线程 1 窃取任务,然后执行该任务。被盗任务通常是双端队列中最旧的任务,这确保了工作负载在池中的所有线程之间均匀分配。

ForkJoinPool 中的线程国际数据集团

图 3. ForkJoinPool 的工作窃取算法的图示

总体而言,ForkJoinPool工作窃取算法是一项强大的功能,可以通过确保有效利用所有可用计算资源来显着提高并行应用程序的性能。

ForkJoinPool的主要类

让我们快速浏览一下支持使用 ForkJoinPool 进行处理的主要类。

  • ForkJoinPool:创建线程池以使用ForkJoin框架。它的工作原理与其他线程池类似。这个类中最重要的方法是commonPool()创建ForkJoin线程池。
  • RecursiveAction:这个类的主要功能是计算递归动作。请记住,在该compute()方法中,我们不返回值。这是因为递归发生在compute()方法内。
  • RecursiveTask:此类的工作方式与 类似RecursiveAction,不同之处在于该compute()方法将返回一个值。

使用递归操作

要使用这些RecursiveAction功能,我们需要继承它并重写该compute()方法。然后,我们使用要实现的逻辑创建子任务。

在下面的代码示例中,我们将并行且递归地计算数组中每个数字的两倍。我们只能并行计算两个乘两个数组元素。

正如您所看到的,该fork()方法调用了该compute()方法。一旦整个数组得到每个元素的总和,递归调用就会停止。一旦数组的所有元素被递归求和,我们就会显示结果。

清单 1. RecursiveAction 的示例


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinDoubleAction {

  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    int[] array = {1, 5, 10, 15, 20, 25, 50};
    DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);

    // Invokes compute method
    forkJoinPool.invoke(doubleNumberTask);
    System.out.println(DoubleNumber.result);
  }
}

class DoubleNumber extends RecursiveAction {

  final int PROCESS_THRESHOLD = 2;
  int[] array;
  int startIndex, endIndex;
  static int result;

  DoubleNumber(int[] array, int startIndex, int endIndex) {
    this.array = array;
    this.startIndex = startIndex;
    this.endIndex = endIndex;
  }

  @Override
  protected void compute() {
    if (endIndex - startIndex <= PROCESS_THRESHOLD) {
      for (int i = startIndex; i < endIndex; i++) {
        result += array[i] * 2;
      }
    } else {
      int mid = (startIndex + endIndex) / 2;
      DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid);
      DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);

      // Invokes the compute method recursively
      leftArray.fork();
      rightArray.fork();

      // Joins results from recursive invocations
      leftArray.join();
      rightArray.join();
    }
  }
}

此计算的输出为 252。

要记住的重要一点RecursiveAction是它不返回值。还可以通过使用分而治之的策略来分解流程来提高性能。

这就是我们在 清单 1 中所做的,我们不是计算每个数组元素的双精度,而是通过将数组分成几部分来并行计算。

还需要注意的是,RecursiveAction当用于可以有效分解为更小的子问题的任务时,这是最有效的。

因此,RecursiveActionForkJoinPool应该用于计算密集型任务,其中工作的并行化可以带来显着的性能改进。否则,由于线程的创建和管理,性能会更差。

递归任务

在下一个示例中,让我们探索一个简单的程序,该程序在中间递归地中断,直到达到基本条件。在本例中,我们使用该类RecursiveTask

RecursiveAction和的区别RecursiveTask在于,RecursiveTask,我们可以在方法中返回一个值compute()

清单 2. RecursiveTask 的示例


import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinSumArrayTask extends RecursiveTask<Integer> {

  private final List<Integer> numbers;

  public ForkJoinSumArrayTask(List<Integer> numbers) {
    this.numbers = numbers;
  }

  @Override
  protected Integer compute() {
    if (numbers.size() <= 2) {
      return numbers.stream().mapToInt(e -> e).sum();
    } else {
      int mid = numbers.size() / 2;
      List<Integer> list1 = numbers.subList(0, mid);
      List<Integer> list2 = numbers.subList(mid, numbers.size());
 
      ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1);
      ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2);

      task1.fork();

      return task1.join() + task2.compute();
    }
  }

  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();

    List<Integer> numbers = List.of(1, 3, 5, 7, 9);
    int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers));

    System.out.println(output);
  }
}

在这里,我们递归地分解中间的数组,直到达到基本条件。

一旦我们破坏了主数组,我们将list1和发送list2ForkJoinSumArrayTask,然后我们分叉task1,它compute()并行执行该方法和数组的其他部分。

一旦递归过程达到基本条件, join就会调用该方法,连接结果。

本例中的输出为 25。

何时使用 ForkJoinPool

ForkJoinPool不应该在所有情况下使用。如前所述,最好将其用于高度密集的并发进程。我们来看看具体是什么情况:

  • 递归任务ForkJoinPool非常适合执行递归算法,例如快速排序、合并排序或二分搜索。这些算法可以分解为更小的子问题并并行执行,这可以显着提高性能。
  • 尴尬的并行问题:如果您有一个可以轻松划分为独立子任务的问题,例如图像处理或数值模拟,您可以使用ForkJoinPool并行执行子任务。
  • 高并发场景:在高并发场景中,例如Web服务器、数据处理管道或其他高性能应用程序,您可以使用ForkJoinPool跨多个线程并行执行任务,这有​​助于提高性能和吞吐量。

概括

在本文中,您了解了如何使用最重要的ForkJoinPool功能在单独的 CPU 内核中执行繁重的操作。让我们总结一下本文的要点:

  • ForkJoinPool是一个使用分而治之策略递归执行任务的线程池。
  • Kotlin 和 Akka 等 JVM 语言使用它来构建消息驱动的应用程序。
  • ForkJoinPool并行执行任务,从而有效利用计算机资源。
  • 工作窃取算法通过允许空闲线程从繁忙线程窃取任务来优化资源利用率。
  • 任务存储在双端队列中,存储采用LIFO策略,窃取采用先进先出策略。
  • 框架中的主要类ForkJoinPool包括ForkJoinPoolRecursiveActionRecursiveTask
    • RecursiveAction用于计算递归操作并且不返回任何值。
    • RecursiveTask类似但返回一个值。
    • compute()方法在两个类中都被重写以实现自定义逻辑。
    • fork()方法调用该compute()方法并将任务分解为更小的子任务。
    • join()方法等待子任务完成并合并其结果。
    • ForkJoinPool通常与并行流 和 一起使用CompletableFuture
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。