并发编程进阶-02

举报
kwan的解忧杂货铺 发表于 2024/08/10 12:01:35 2024/08/10
【摘要】 1.DelayQueue 的使用场景?在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现。关闭空闲连接.服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。缓存.缓存中的对象,超过了空闲时间,需要从缓存中移出。任务超时处理.在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。 2...

1.DelayQueue 的使用场景?

在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现。

  • 关闭空闲连接.服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

  • 缓存.缓存中的对象,超过了空闲时间,需要从缓存中移出。

  • 任务超时处理.在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

2.TransferQueue 的使用?

使用 TransferQueue 交替打印字符串

public class Juc_03_question_AbcAbc_06 {
  public static void main(String[] args){
    char[] aC = "ABC".toCharArray();
    char[] bC = "123".toCharArray();
    TransferQueue<Character> queue = new LinkedTransferQueue<>();
    new Thread(()-> {
      try {
        for (char c : aC){
          System.out.println(queue.take());
          queue.transfer(c);
        }
      } catch (InterruptedException e){
        e.printStackTrace();
      }
    },"t1").start();
    new Thread(()-> {
      try {
        for (char c : bC){
          queue.transfer(c);
          System.out.println(queue.take());
        }
      } catch (InterruptedException e){
        e.printStackTrace();
      }
    },"t2").start();
  }
}

为什么 SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue:

SynchronousQueue 无锁竞争,需要依据实际情况注意生产者线程和消费者线程的配比.

3.Fork/Join 框架原理?

Fork/Join 框架是 Java 中用于并行任务执行的一种框架,它基于"分治"(divide-and-conquer)的思想。Fork/Join 框架允许将一个大任务划分为多个小任务,然后并行地执行这些小任务,并最终将它们的结果合并起来得到最终的结果。

Fork/Join 框架的原理如下:

  1. 分解任务:在 Fork/Join 框架中,一个大任务会被逐步地拆分成多个小任务,直到这些小任务可以直接处理(通常是足够小到不可再拆分的大小)。这个过程称为"分解"(Forking)。
  2. 并行执行:一旦任务被成功地拆分成多个小任务,这些小任务就可以并行地在不同的处理器上执行。Fork/Join 框架通过工作窃取(Work-Stealing)算法来实现任务的动态调度。当一个线程完成了它所拥有的小任务后,它会尝试从其他线程的任务队列中"窃取"一个新的任务进行处理,以保持线程的高利用率。
  3. 合并结果:在并行执行的过程中,每个小任务都会产生一个局部结果。当所有小任务都完成后,这些局部结果将会被合并成整个大任务的最终结果。这个过程称为"合并"(Joining)。

Fork/Join 框架主要涉及以下两个关键类:

  • ForkJoinTask:这是一个抽象类,用于表示一个可以并行执行的任务。它有两个重要的子类:RecursiveTask用于有返回值的任务,RecursiveAction用于没有返回值的任务。
  • ForkJoinPool:这是 Fork/Join 框架的线程池,负责管理和调度任务的执行。它维护了一个工作队列和多个工作线程,以便高效地执行分解和合并任务。

在使用 Fork/Join 框架时,开发者需要继承RecursiveTaskRecursiveAction类,实现compute()方法,在compute()方法中将大任务分解成小任务,并实现任务的执行和结果合并逻辑。然后,将这些小任务提交给 Fork/Join 框架进行并行执行,最终得到任务的结果。Fork/Join 框架的自动任务调度和工作窃取算法能够有效地利用多核处理器的计算资源,提高并行任务执行的效率。

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。

4.fork 方法解读

当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread 的 pushTask 方法异步地执行这个任务,然后立即返回结果.代码如下。

public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}
  1. 获取当前线程:
    • 首先,代码通过Thread.currentThread()方法获取当前正在执行fork()方法的线程对象,并将其赋值给变量t
  2. 判断当前线程类型:
    • 然后,代码通过instanceof关键字判断当前线程是否是ForkJoinWorkerThread的实例。
    • 如果当前线程是ForkJoinWorkerThread的实例,说明该线程是 Fork/Join 框架中的工作线程。
  3. 提交任务:
    • 如果当前线程是 Fork/Join 框架的工作线程,即ForkJoinWorkerThread的实例,那么代码会将当前任务this推送(push)到该工作线程所绑定的工作队列中。
    • 如果当前线程不是 Fork/Join 框架的工作线程,即是普通的 Java 线程,那么代码会通过ForkJoinPool.common.externalPush(this)方法将当前任务this提交给 Fork/Join 框架的公共池(commonPool)。
  4. 返回任务:
    • 最后,fork()方法会返回当前任务this,以便链式调用或其他后续操作。

总结:fork()方法用于将当前任务提交给 Fork/Join 框架进行并行执行。如果当前线程是 Fork/Join 框架的工作线程,任务会被推送到该工作线程的工作队列中;如果当前线程不是 Fork/Join 框架的工作线程,任务会被提交给 Fork/Join 框架的公共池。通过fork()方法,开发者可以将一个大任务拆分成子任务,实现任务的并行执行。

final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    }
    else if (n >= m)
      growArray();
  }
}

5.join 方法解读

Join 方法的主要作用是阻塞当前线程并等待获取结果.让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下。

public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    reportException(s);
  return getRawResult();
}
  • 首先,代码声明一个整型局部变量s,用于保存任务执行的状态。
  • 定义局部变量:
    • 首先,代码声明一个整型局部变量s,用于保存任务执行的状态。
  • 调用doJoin()方法:
    • 接下来,代码调用doJoin()方法,该方法实际上是ForkJoinTask类的一个抽象方法,需要在子类中实现。doJoin()方法用于实际等待任务的完成并获取其执行状态。
  • 获取执行状态:
    • doJoin()方法返回的是任务执行状态的值。通过位运算& DONE_MASK,将s的值与DONE_MASK(一个常量,表示任务状态的掩码)进行按位与运算,可以得到任务的实际执行状态。
  • 判断执行状态:
    • 如果执行状态s不等于NORMAL(其中NORMALForkJoinTask类中的一个常量,表示任务正常完成),则说明任务执行过程中出现了异常或被取消。在这种情况下,代码会调用reportException(s)方法,对异常进行处理和报告。
  • 返回结果:
    • 如果任务的执行状态s等于NORMAL,则说明任务已经正常完成。此时,代码调用getRawResult()方法,获取任务的执行结果,并将结果返回给调用者。

总结:join()方法用于等待当前任务的执行结果。它通过调用doJoin()方法获取任务的执行状态,判断任务是否正常完成。如果任务正常完成,则调用getRawResult()方法获取任务的执行结果并返回;如果任务执行过程中出现异常或被取消,则通过reportException(s)方法对异常进行处理。通过join()方法,可以实现对任务执行结果的获取和等待。

再来分析一下 doJoin()方法的实现代码

private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :
  externalAwaitDone();
}

在 doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行.如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

6.说说 java 中的原子操作类?

常用的有 13 个原子操作类,都是通过 cas 实现的.

Java 从 JDK1.5 开始提供了 java.util.concurrent.atomic 包(以下简称 Atomic 包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式.因为变量的类型有很多种

  • Atomic 包里一共提供了 13 个类
  • 属于 4 种类型的原子更新方式
    • 原子更新基本类型、
    • 原子更新数组、
    • 原子更新引用
    • 原子更新属性(字段).
  • Atomic 包里的类基本都是使用 Unsafe 实现的包装类。

atomic 提供了 3 个类用于原子更新基本类型:

  • AtomicInteger 原子更新整形
  • AtomicLong 原子更新长整形
  • AtomicBoolean 原子更新 bool 值

atomic 里提供了三个类用于原子更新数组里面的元素:

  • AtomicIntegerArray:原子更新整形数组里的元素;
  • AtomicLongArray:原子更新长整形数组里的元素;
  • AtomicReferenceArray:原子更新引用数组里的元素。

原子更新基本类型的 AtomicInteger 只能更新一个变量,如果要原子更新多个变量,就需要使用原子更新引用类型提供的类了.原子引用类型 atomic 包主要提供了以下几个类:

  • AtomicReference:原子更新引用类型;
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
  • AtomicMarkableReference:原子更新带有标记位的引用类型.可以原子更新一个布尔类型的标记位和引用类型.构造方法是 AtomicMarkableReference(V initialRef, boolean initialMark)

如果需要原子更新某个对象的某个字段,就需要使用原子更新属性的相关类,atomic 中提供了一下几个类用于原子更新属性:

  • AtomicIntegerFieldUpdater:原子更新整形属性的更新器;
  • AtomicLongFieldUpdater:原子更新长整形的更新器;
  • AtomicStampedReference:原子更新带有版本号的引用类型.该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicMarkableReference 也可以解决 ABA 问题.

7.说说对 CountDownLatch 的理解?

CountDownLatch是 Java 中并发包(java.util.concurrent)提供的一个同步工具类,它的原理主要基于倒计数的方式。CountDownLatch允许一个或多个线程等待其他线程完成一组操作,然后再继续执行自己的任务。

CountDownLatch的原理如下:

  1. 初始化计数值:
    • 在创建CountDownLatch实例时,需要指定一个初始的计数值(count)。该计数值表示需要等待的操作的数量。
  2. 等待操作:
    • 在主线程或某个线程中,调用CountDownLatchawait()方法,该方法会使当前线程进入等待状态,直到计数值变为零。
    • 如果计数值当前不为零,则await()方法会一直阻塞当前线程,直到计数值为零。
  3. 完成操作:
    • 在其他线程中,执行一组操作,并在每个操作完成后,调用CountDownLatchcountDown()方法。每次调用countDown()方法,计数值减 1。
  4. 计数归零:
    • countDown()方法的调用次数累计达到初始计数值时,计数值将变为零。
  5. 继续执行:
    • 一旦计数值变为零,所有因调用await()方法而进入等待状态的线程都会被唤醒,继续执行后续的任务。

通过以上原理,CountDownLatch可以实现线程间的协作和同步,使得某个线程等待其他线程完成特定操作后再继续执行。这对于多线程场景中需要等待其他线程完成某些初始化、数据加载或任务执行的情况非常有用。一旦计数值变为零,所有等待的线程都会被唤醒,从而实现了线程的协调和同步。

latch.await();
latch.countDown();

8.同步屏障 CyclicBarrier 理解?

CyclicBarrier是 Java 中并发包(java.util.concurrent)提供的另一个同步工具类,它的原理是循环栅栏。CyclicBarrier允许一组线程互相等待,直到所有线程都到达一个公共的屏障点,然后再同时继续执行。

CyclicBarrier的原理如下:

  1. 初始化屏障点和参与线程数:
    • 在创建CyclicBarrier实例时,需要指定一个屏障点(barrier),表示所有线程都要等待达到的点。同时,需要指定参与线程数(parties),表示需要等待的线程数量。
  2. 等待到达屏障点:
    • 在各个线程中,调用CyclicBarrierawait()方法,该方法会使当前线程等待,直到所有线程都到达屏障点。
    • 每次调用await()方法,当前线程会被阻塞,直到所有参与线程都调用了await()方法。
  3. 达到屏障点后继续执行:
    • 一旦所有参与线程都调用了await()方法,它们都会在屏障点处等待。
    • 当所有参与线程都到达屏障点后,CyclicBarrier会解除所有线程的阻塞状态,使它们可以继续执行后续的任务。
  4. 循环使用:
    • CountDownLatch不同,CyclicBarrier是可以循环使用的。一旦所有线程都到达屏障点并被释放,CyclicBarrier会被重置,所有线程可以再次使用它进行下一轮的同步。

通过以上原理,CyclicBarrier可以实现多个线程之间的协作和同步,让它们在公共的屏障点处等待,直到所有线程都到达后再同时继续执行。这对于多个线程需要同时完成某个阶段,然后再一起继续执行后续阶段的情况非常有用。每当CyclicBarrier被重置,新的一轮同步过程又可以开始。

CyclicBarrier 依赖于 ReentrantLock 实现

barrier.await();
barrier.getNumberWaiting()

9. CyclicBarrier 和 CountDownLatch?

CyclicBarrierCountDownLatch是 Java 并发包中两种不同的同步工具,它们在使用场景和原理上有一些区别。

  1. 同步方式:
    • CyclicBarrier:采用循环栅栏的同步方式。多个线程在达到公共的屏障点处等待,直到所有线程都到达后才同时继续执行。CyclicBarrier可以循环使用,每当所有线程都到达屏障点并被释放,它会被重置,可以进行下一轮的同步。
    • CountDownLatch:采用倒计数的同步方式。一个或多个线程等待其他线程执行完成一组操作后再继续执行。CountDownLatch的计数值在创建时被初始化,每当一个线程完成一个操作,计数值减 1,直到计数值变为零时,等待的线程被唤醒。
  2. 参与方面:
    • CyclicBarrier:需要指定参与线程数,在每次等待时都要等待所有参与线程到达屏障点。
    • CountDownLatch:需要指定倒计数值,在倒计数值变为零时所有等待的线程都会被唤醒,不需要指定参与线程数。
  3. 循环使用:
    • CyclicBarrier可以循环使用,每次达到屏障点后,它会被重置,可以进行下一轮的同步。
    • CountDownLatch在计数值减为零后,无法重置或再次使用,一旦倒计数值为零,它就失去了继续等待其他线程的能力。
  4. 作用场景:
    • CyclicBarrier适用于多个线程需要等待其他线程同时到达某个屏障点后再一起继续执行的场景,常用于解决复杂任务的拆分和合并问题。
    • CountDownLatch适用于一个或多个线程需要等待其他线程完成一组操作后再继续执行的场景,常用于线程间协调和同步。

10.说说 Semaphore ?

Semaphore是 Java 中并发包(java.util.concurrent)提供的另一个同步工具类,它的原理基于信号量的概念。Semaphore用于控制同时访问某个共享资源的线程数量,它维护一个许可证(permit)的计数,用于限制同时访问共享资源的线程数量。

Semaphore的原理如下:

  1. 初始化许可证数量:
    • 在创建Semaphore实例时,需要指定初始的许可证数量。这个数量表示同时允许的线程数。
  2. 获取许可证:
    • 当一个线程需要访问共享资源时,它首先需要调用Semaphoreacquire()方法。如果当前许可证数量大于零,该线程将获得一个许可证,并将许可证数量减 1。这样,许可证数量就可以反映当前可用的资源数。
  3. 许可证数量为零时阻塞:
    • 如果当前许可证数量为零,即所有的许可证都被其他线程占用,那么acquire()方法将会阻塞当前线程,直到有其他线程释放许可证。
  4. 释放许可证:
    • 当一个线程使用完共享资源后,它需要调用Semaphorerelease()方法来释放许可证。这将增加许可证数量,并允许其他等待许可证的线程继续执行。

通过以上原理,Semaphore可以控制同时访问某个共享资源的线程数量,以防止过多的线程同时竞争资源导致资源过度消耗或产生冲突。Semaphore是一种有效的并发控制工具,常用于限制同时访问共享资源的线程数量,控制并发访问的并发性。

semaphore.acquire();//阻塞
semaphore.release();//释放
intavailablePermits():返回此信号量中当前可用的许可证数。
intgetQueueLength():返回正在等待获取许可证的线程数。
booleanhasQueuedThreads():是否有线程正在等待获取许可证。
void reducePermits(intreduction):减少reduction个许可证,是个protected方法。
CollectiongetQueuedThreads):返回所有等待获取许可证的线程集合,是个protected方法。

11.CopyOnWriteArrayList 原理?

它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和 ArrayList 不同的时

它具有以下特性:

  • 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
  • 它是线程安全的。
  • 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
  • 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
  • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

原理:

  • CopyOnWriteArrayList 实现了 List 接口,因此它是一个队列。
  • CopyOnWriteArrayList 包含了成员 lock。每一个 CopyOnWriteArrayList 都和一个监视器锁 lock 绑定,通过 lock,实现了对 CopyOnWriteArrayList 的互斥访问。
  • CopyOnWriteArrayList 包含了成员 array 数组,这说明 CopyOnWriteArrayList 本质上通过数组实现的。
  • CopyOnWriteArrayList 的“动态数组”机制 – 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”。这就是它叫做 CopyOnWriteArrayList 的原因!CopyOnWriteArrayList 就是通过这种方式实现的动态数组;不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,效率比较高。
  • CopyOnWriteArrayList 的“线程安全”机制 – 是通过 volatile 和监视器锁 Synchrnoized 来实现的。
  • CopyOnWriteArrayList 是通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的 保证。
  • CopyOnWriteArrayList 通过监视器锁 Synchrnoized 来保护数据。在“添加/修改/删除”数据时,会先“获取监视器锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的。

12.LongAdder 原理

LongAdder 是 Java 并发包中的一个类,用于高效地支持并发计数操作。它在 Java 8 中被引入,是对 AtomicLong 的改进和优化。

在多线程环境下,通常需要对共享的计数器进行增加操作。传统的 AtomicLong 类在高并发环境下会存在性能问题,因为它使用 CAS(Compare and Swap)指令来保证操作的原子性。在高并发情况下,多个线程竞争同一个 AtomicLong 实例可能导致大量的 CAS 操作,从而降低性能。

LongAdder 通过在内部使用一种更加高效的技术,将计数分散到多个变量中,从而减少了竞争。它维护了一个数组来保存多个变量,每个线程在进行计数操作时会根据哈希算法选择一个特定的变量进行增加,而不是像 AtomicLong 那样直接竞争一个变量。

这样做的好处是,在高并发情况下,线程之间几乎没有竞争,从而减少了 CAS 操作的次数,提高了并发性能。当需要获取总计数时,LongAdder 将所有变量的值求和得到结果。

使用 LongAdder 时,你可以通过调用 add() 方法增加计数,也可以通过 sum() 方法获取当前的总计数。

以下是 LongAdder 的简单示例:

import java.util.concurrent.atomic.LongAdder;

public class LongAdderExample {
    public static void main(String[] args) {
        LongAdder counter = new LongAdder();

        // 多个线程增加计数
        Runnable incrementTask = () -> {
            for (int i = 0; i < 1000; i++) {
                counter.add(1);
            }
        };

        Thread thread1 = new Thread(incrementTask);
        Thread thread2 = new Thread(incrementTask);

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 获取总计数
        long total = counter.sum();
        System.out.println("Total count: " + total);
    }
}

总结一下,LongAdder 是在高并发环境下用于替代 AtomicLong 的一种高效并发计数器。它通过将计数分散到多个变量中,减少了线程之间的竞争,从而提高了并发性能。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。