并发编程进阶-06

举报
kwan的解忧杂货铺 发表于 2024/08/12 22:50:10 2024/08/12
【摘要】 1.谈谈常见的锁有哪些?下面是对锁的粒度、乐观锁/悲观锁、公平锁/非公平锁、排他锁/共享锁、读写锁、自旋锁的描述和区分,使用表格的形式美化并用中文描述:类别描述锁的粒度锁的粒度指的是锁定的范围,即锁保护的是整个对象还是对象的一部分。synchronized 锁的状态Java 中最常用的锁机制,使用关键字synchronized实现。在锁定对象时,对象的状态可以是无锁、偏向锁、轻量级锁或重量...

1.谈谈常见的锁有哪些?

image-20231021173706822

下面是对锁的粒度、乐观锁/悲观锁、公平锁/非公平锁、排他锁/共享锁、读写锁、自旋锁的描述和区分,使用表格的形式美化并用中文描述:

类别 描述
锁的粒度 锁的粒度指的是锁定的范围,即锁保护的是整个对象还是对象的一部分。
synchronized 锁的状态 Java 中最常用的锁机制,使用关键字synchronized实现。在锁定对象时,对象的状态可以是无锁、偏向锁、轻量级锁或重量级锁。
偏向锁 偏向锁是指当只有一个线程访问对象的同步块时,该线程会自动获取锁,避免了多次加锁和解锁的开销,提高性能。
轻量级锁 轻量级锁是指当多个线程争用锁时,通过自旋的方式进行一段时间的忙等待,不进入阻塞状态。如果在自旋期间成功获取到锁,那么就是轻量级锁;否则,升级为重量级锁。
重量级锁 重量级锁是指当多个线程争用锁时,无法获取锁的线程会进入阻塞状态,释放 CPU 资源,直到持有锁的线程释放锁,阻塞的线程才会被唤醒。重量级锁的状态切换需要在用户态和内核态之间进行上下文切换,开销较大。
乐观锁/悲观锁 乐观锁和悲观锁是针对并发访问的数据的不同策略。
悲观锁 悲观锁认为"坏事一定会发生",在操作数据前先锁定数据,避免其他线程修改数据,保证数据的一致性。悲观锁常用于 synchronized、ReentrantLock 等锁机制。
乐观锁 乐观锁认为"坏事未必会发生",在操作数据时不会立即锁定数据,而是先进行操作,然后再检查是否有其他线程同时修改了数据。如果没有冲突,则操作成功;如果发现冲突,则需要进行回滚或重试操作。乐观锁的实现常用的方式是 CAS(Compare And Swap)。
公平锁/非公平锁 公平锁和非公平锁是针对线程获取锁的顺序的不同策略。
公平锁 公平锁遵循"先到先得"的原则,等待时间最长的线程优先获取锁。所有线程按照它们请求锁的顺序进行获取。
非公平锁 非公平锁是一种抢占式的锁策略,没有严格的获取顺序。新来的线程有可能在老的线程之前获取到锁。非公平锁通过减少线程上下文切换的次数,提高了性能。ReentrantLock 可以作为公平锁或非公平锁使用。
排他锁/共享锁 排他锁和共享锁是针对多个线程对同一资源的访问权限的不同策略。
排他锁 排他锁也称为写锁,表示只有一个线程能够独占地获取锁,其他线程不能同时访问被锁定的代码。在排他锁下,线程在获取到锁之前都会被阻塞。ReentrantLock 和 synchronized 都可以实现排他锁。
共享锁 共享锁也称为读锁,表示多个线程可以同时获取锁并共享被锁定的代码。只有在没有任何线程持有写锁的情况下,才能获取读锁。在共享锁下,多个线程可以同时读取数据,但不能写入数据。ReentrantReadWriteLock 是实现读写锁的一种机制。
读写锁 读写锁是一种特殊的锁机制,它允许多个线程同时读取数据,但只允许一个线程写入数据。在读写锁下,当没有线程持有写锁时,多个线程可以获取读锁;当有线程持有写锁时,其他线程不能获取读锁或写锁。读写锁适用于对读操作较多、写操作较少的情况,可以提高并发性。ReentrantReadWriteLock 是实现读写锁的一种机制。
自旋锁 自旋锁是一种乐观锁的实现方式,当一个线程尝试获取锁时,如果锁已经被其他线程持有,该线程会循环等待,不断尝试获取锁,直到获取成功。自旋锁适用于锁的持有时间非常短暂的场景,避免了线程切换的开销。但是如果锁的持有时间较长,自旋会导致 CPU 资源的浪费。自旋锁的实现常用的方式是 CAS。

2.什么是 AQS?

AQS(AbstractQueuedSynchronizer)是 Java 并发包中实现锁、同步器等的基础框架,它提供了一种便于实现自定义同步器的模板方法。AQS 的核心思想是使用一个 FIFO 双向链表(即等待队列)来管理线程的状态和竞争资源的获取。AQS 基于这个等待队列,通过使用 volatile 变量和 CAS(Compare And Swap)指令来实现线程的等待和唤醒、资源的获取和释放。

AQS 的主要原理如下:

  1. 状态管理:
    • AQS 使用一个整型的状态变量(state)来表示资源的状态。线程在访问共享资源时,需要通过 CAS 原子操作来获取或释放这个状态变量。根据不同的同步器实现,状态变量可以表示锁的状态、信号量的剩余许可数等。
  2. 等待队列:
    • AQS 内部维护一个 FIFO 双向链表,用于保存等待获取资源的线程。这个等待队列是 AQS 的核心数据结构,它管理着所有在同步器上等待的线程。等待队列中的每个节点代表一个等待线程。
  3. 线程状态转换:
    • 当一个线程需要获取资源时,如果发现资源已被其他线程占用,则该线程会进入等待状态。在 AQS 中,等待状态有两种:独占模式(exclusive mode)和共享模式(shared mode)。独占模式用于实现排他锁,而共享模式用于实现读写锁等并发控制机制。
  4. 等待与唤醒:
    • 当一个线程需要等待资源时,它会被包装成一个等待节点(Node)并加入到等待队列中。等待节点会被挂起,进入等待状态。当资源被释放或满足某个条件时,AQS 会根据具体的同步器规则,从等待队列中唤醒等待的线程,使其重新进入就绪状态,准备竞争资源。
  5. 自旋与阻塞:
    • 在 AQS 中,线程在等待状态时,会进行自旋尝试获取资源。自旋是指线程在不断地检查资源的状态,如果资源可用则获取;否则,线程可能会自旋等待一段时间。如果自旋等待仍然无法获取资源,线程将被阻塞,进入阻塞状态,不再占用 CPU 资源。

总结下 AQS 是什么:

  • AQS 是一个同步的基础框架,基于一个先进先出的队列
  • 锁机制依赖一个原子值的状态。
  • AQS 的子类负责定义与操作这个状态值,但必须通过 AQS 提供的原子操作。
  • AQS 剩余的方法就是围绕队列,与线程阻塞唤醒等功能。

4.Node 和 ConditionObject

AQS 中有两个重要的成员变量:Node 和 ConditionObject

  • Node
    • Node 的作用是存储获取锁失败的线程,并且维护一个 CLH FIFO 队列,该队列是会被多线程操作的,所以 Node 中大部分变量都是被 volatile 修饰,并且通过自旋和 CAS 进行原子性的操作。
    • 有一个模式的属性:独占模式和共享模式,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已.
  • ConditionObject
    • 条件队列
    • 该类主要是为了让子类实现独占模式。AQS 框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。

node 方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出 npe
nextWaiter 指向下一个处于 CONDITION 状态的节点
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 共享,多个线程可同时执行,如Semaphore/CountDownLatch
EXCLUSIVE 独占,只有一个线程能执行,如ReentrantLock

5.AQS 基于什么设计模式实现的?

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

同步器可重写的方法如下图所示:

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态

实现自定义同步组件时,将会调用同步器提供的模板方法

方法名称 描述
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于 0 的值,表示获取成功,反之,获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

6.AQS 底层同步队列的原理?

同步器依赖内部的同步队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述.

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 与 acquire(intarg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 并返回
boolean tryAcquireNanos(int arg,long nanos) 在 acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回 false,如果获取到了返回 true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进人同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInteruptibly(int arg) 与 acquireShared(intarg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg,long nanos) 在 acquireSharedInterruptibly(intarg)基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection<Thread>getQueuedThreads 获取等待在同步队列上的线程集合

同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部

image-20220415133145679

同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点.试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于 CAS 的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联

同步队列遵循 FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

image-20220415133223169

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的 next 引用即可。

AQS会旋转几次获取锁:

会旋转 2 次,第一次的时候,未获取到会生成队列节点,第二次是是否为头结点,第二次可以是多次,可能出现非公平锁的饥饿状态,获取锁的过程实际上是获取 state 状态.

7.AQS 独占式同步状态获取与释放?

AQS 的 acquire 方法

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

通过调用同步器的 acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的 tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过 addWaiter(Node node)方法将该节点加入到同步队列的尾部,当出现竞争时,采用 CAS 的方式加入到同步队列的尾部.最后调用 acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态.如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  //快速尝试在尾部添加
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}


private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize  头节点可能为空
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

上述代码通过使用 compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。

在 enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置.可以看出,enq(final Node node)方法将并发添加节点的请求通过 CAS 变得“串行化”了。

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

如果前驱节点是头节点,则尝试获取同步锁.而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。如果前驱节点不是头节点,则获取同步锁失败,那么线程继续在同步队列中等待.独占式同步状态获取流程,也就是 acquire(int arg)方法调用流程如下所示:

image-20240117173600591

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态.通过调用同步器的 release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态).该方法代码如下所示:

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
private void unparkSuccessor(Node node) {

  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);
}

该方法执行时,会唤醒头节点的后继节点线程, unparkSuccessor(Node node)方法使用 LockSupport 来唤醒处于等待状态的线程。

image-20231021174009263

8.共享式同步状态获取与释放?

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态.以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。在 acquireShared(int arg)方法中,同步器调用 tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为 int 类型,当返回值大于等于 0 时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(intarg)方法返回值大于等于 0。在 doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于 0,表示该次获取同步状态成功并从自旋过程中退出。

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点.对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于 tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作可能会同时来自多个线程。

//获取同步状态
public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

//释放同步状态
private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue; // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;// loop on failed CAS
    }
    if (h == head) // loop if head changed
      break;
  }
}

9.AQS 独占式超时获取锁和可中断获取锁?

在 Java 5 之前,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在 synchronized 上,等待着获取锁.在 Java 5 中,同步器提供了 acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException。

超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”, doAcquireNanos(int

arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性.针对超时获取,主要需要计算出需要睡眠的时间间隔 nanosTimeout,为了防止过早通知, nanosTimeout 计算公式为: nanosTimeout-=now-lastTime,其中 now 为当前唤醒时间, lastTime 为上次唤醒时间,如果 nanosTimeout 大于 0 则表示超时时间未到,需要继续睡眠 nanosTimeout 纳秒,反之,表示已经超时。

private boolean doAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
  if (nanosTimeout <= 0L)
    return false;
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
      }
      nanosTimeout = deadline - System.nanoTime();
      if (nanosTimeout <= 0L)
        return false;
      if (shouldParkAfterFailedAcquire(p, node) &&
          nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
      if (Thread.interrupted())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

如果 nanosTimeout 小于等于 spinForTimeoutThreshold (1000 纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程.原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让 nanosTimeout 的超时从整体上表现得反而不精确.因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。

image-20231219111638699

10.AQS 实现的工具类?

  • AbstractQueuedSynchronizer (java.util.concurrent.locks)

    • CountDownLatch (java.util.concurrent)

    • ThreadPoolExecutor (java.util.concurrent)

    • LimitLatch (org.apache.tomcat.util.threads)

    • ReentrantLock (java.util.concurrent.locks)

    • ReentrantReadWriteLock (java.util.concurrent.locks)

    • Semaphore (java.util.concurrent)

11.tryAcquireShared(int)函数返回值?

  • 负数:表示获取失败;
  • 0:获取成功,但没有剩余资源;
  • 正数:获取成功,且有剩余资源;
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。