从一个简单的Java单例示例谈谈并发(下)

举报
且听风吟 发表于 2020/02/16 13:24:23 2020/02/16
【摘要】 locks锁是用来控制多个线程访问共享资源的形式,Java SE 5之后,J.U.C中新增了locks来实现锁功能,它提供了与synchronized关键字类似的同步功能。只是在使用时需要显示的获取和释放锁。虽然它缺少了隐式获取和释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁及超时获取锁等多种synchronized关键字不具备的同步特性。locks在这我们只介绍下核心的...

locks

锁是用来控制多个线程访问共享资源的形式,Java SE 5之后,J.U.C中新增了locks来实现锁功能,它提供了与synchronized关键字类似的同步功能。只是在使用时需要显示的获取和释放锁。虽然它缺少了隐式获取和释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁及超时获取锁等多种synchronized关键字不具备的同步特性。

locks在这我们只介绍下核心的AQS(AbstractQueuedSynchronizer,队列同步器),AQS是用来构建锁或者其他同步组件的基础框架,它使用一个用volatile修饰的int成员变量表示同步状态。通过内置的FIFO队列来完成资源获取线程的排队工作。同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程免不了要对同步状态进行更改,这时候就会使用到AQS提供的3个方法:getState()、setState()和compareAndSetState()来进行操作,这是因为它们能够保证状态的改变是原子性的。为什么这么设计呢?因为锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节,而AQS面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和AQS很好的隔离了使用者和实现者锁关注的领域。 

现在我们就自定义一个独占锁来详细解释下AQS的实现机制

public class Mutex implements Lock { 
	private static class Sync extends AbstractQueuedSynchronizer { 
		private static final long serialVersionUID = -4387327721959839431L; 
		protected boolean isHeldExclusively() { 
			return getState() == 1; 
		} 
		public boolean tryAcquire(int acquires) { 
			assert acquires == 1; 
			// Otherwise unused 
			if (compareAndSetState(0, 1)) { 
				setExclusiveOwnerThread(Thread.currentThread()); 
				return true; 
			} 
			return false; 
		} 
		protected boolean tryRelease(int releases) { 
			assert releases == 1; 
			// Otherwise unused 
			if (getState() == 0) throw new IllegalMonitorStateException(); 
			setExclusiveOwnerThread(null); 
			setState(0); 
			return true; 
		} 
		Condition newCondition() { 
			return new ConditionObject(); 
		} 
	} 
	private final Sync sync = new Sync(); 
	public void lock() { 
		sync.acquire(1); 
	} 
	public boolean tryLock() { 
		return sync.tryAcquire(1); 
	} 
	public void unlock() { 
		sync.release(1); 
	} 
	public Condition newCondition() { 
		return sync.newCondition(); 
	} 
	public boolean isLocked() { 
		return sync.isHeldExclusively(); 
	} 
	public boolean hasQueuedThreads() { 
		return sync.hasQueuedThreads(); 
	} 
	public void lockInterruptibly() throws InterruptedException { 
		sync.acquireInterruptibly(1); 
	} 
	public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { 
		return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 
	} 
}

实现自定义组件的时候,我们可以看到,AQS可重写的方法是tryAcquire()——独占式获取同步状态、tryRelease()——独占式释放同步状态、tryAcquireShared()——共享式获取同步状态、tryReleaseShared ()——共享式释放同步状态、isHeldExclusively()——是否被当前线程所独占。这个示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire()中,如果经过CAS设置成功(同步状态设置为1),则表示获取了同步状态,而在tryRelease()中,只是将同步状态重置为0。接着我们对比一下重入锁(ReentrantLock)的源码实现

public class ReentrantLock implements Lock, java.io.Serializable { 
	private static final long serialVersionUID = 7373984872572414699L; 
	/** Synchronizer providing all implementation mechanics */ 
	private final Sync sync; 
	/** * Base of synchronization control for this lock. Subclassed 
	* into fair and nonfair versions below. Uses AQS state to 
	* represent the number of holds on the lock. */ 
	abstract static class Sync extends AbstractQueuedSynchronizer { 
		private static final long serialVersionUID = -5179523762034025860L; 
		/** * Performs {@link Lock#lock}. The main reason for subclassing 
		* is to allow fast path for nonfair version. */ 
		abstract void lock(); 
		/** * Performs non-fair tryLock. tryAcquire is 
		* implemented in subclasses, but both need nonfair 
		* try for trylock method. */ 
		final boolean nonfairTryAcquire(int acquires) { 
			final Thread current = Thread.currentThread(); 
			int c = getState(); 
			if (c == 0) { 
				if (compareAndSetState(0, acquires)) { 
					setExclusiveOwnerThread(current); return true; 
				} 
			} else if (current == getExclusiveOwnerThread()) {
				int nextc = c + acquires; 
				if (nextc < 0) 
				// overflow 
					throw new Error("Maximum lock count exceeded"); 
				setState(nextc); 
				return true; 
			} 
			return false; 
		} 
		protected final boolean tryRelease(int releases) { 
			int c = getState() - releases; 
			if (Thread.currentThread() != getExclusiveOwnerThread()) 
				throw new IllegalMonitorStateException(); 
			boolean free = false; 
			if (c == 0) { 
				free = true; 
				setExclusiveOwnerThread(null); 
			} 
			setState(c); 
			return free;
		} 
		protected final boolean isHeldExclusively() {
		// While we must in general read state before owner, 
		// we don't need to do so to check 
			if current thread is owner 
				return getExclusiveOwnerThread() == Thread.currentThread(); 
		} 
		final ConditionObject newCondition() { 
			return new ConditionObject(); 
		} 
		// Methods relayed from outer class 
		final Thread getOwner() { 
			return getState() == 0 ? null : getExclusiveOwnerThread(); 
		} 
		final int getHoldCount() { 
			return isHeldExclusively() ? getState() : 0; 
		} 
		final boolean isLocked() { 
			return getState() != 0; 
		} 
		/** * Reconstitutes this lock instance from a stream. 
		* @param s the stream */ 
		private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { 
			s.defaultReadObject(); 
			setState(0); 
			// reset to unlocked state 
		} 
	} 	
	/** * Sync object for non-fair locks */ 
	static final class NonfairSync extends Sync { 
		//todo sth... 
	} 
	//todo sth... 
}

重入锁分公平锁和不公平锁,默认使用的是不公平锁,在这我们看到实现重入锁大体上跟我们刚才自定义的独占锁差不多,但是有什么区别呢?我们看看重入锁nonfairTryAcquire()方法实现:首先获取同步状态(默认是0),如果是0的话,CAS设置同步状态,非0的话则判断当前线程是否已占有锁,如果是的话,则偏向更新同步状态。从这里我们不难推断出重入锁的概念,同一个线程可以多次获得同一把锁,在释放的时候也必须释放相同次数的锁。通过对比相信大家对自定义一个锁有了一个初步的概念,也许你存在疑问我们重写的这几个方法在AQS哪地方用呢?现在我们来继续往下跟踪,我们深入跟踪下刚才自定义独占锁lock()方法里面acquire()的实现

    image.png

这个方法在AQS类里面,看到里面的tryAcquire(arg)大家也就明白了,tryAcquire(arg)方法获取同步状态,后面acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法就是说的节点构造、加入同步队列及在同步队列中自旋等待的AQS没暴露给我们的相关操作。大体的流程就是首先调用自定义同步器实现的tryAcquire()方法,该方法保证线程安全的获取同步状态,如果获取同步状态失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter()方法将该节点加入到同步队列的尾部,最后调用acquireQueued()方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要靠前驱节点的出队或阻塞线程被中断来实现。也许你还是不明白刚才所说的,那么我们继续跟踪下addWaiter()方法的实现

     image.png

上面的代码通过使用compareAndSetTail()方法来确保节点能够被线程安全添加。在enq()方法中,同步器通过“死循环”来确保节点的正确添加,在”死循环“中只有通过CAS将节点设置成为尾节点之后,当前线程才能够从该方法返回,否则,当前线程不断地尝试重试设置。

在节点进入同步队列之后,发生了什么呢?现在我们继续跟踪下acquireQueued()方法

     image.png

从上面的代码我们不难看出,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省的观察,当条件满足时(自己的前驱节点是头节点就进行CAS设置同步状态)就获得同步状态,然后就可以从自旋的过程中退出,否则依旧在这个自旋的过程中。

collections

从前面的思维导图我们可以看到并发容器包括链表、队列、HashMap等.它们都是线程安全的。

  • ConcurrentHashMap : 一个高效的线程安全的HashMap。

  • CopyOnWriteArrayList : 在读多写少的场景中,性能非常好,远远高于vector。

  • ConcurrentLinkedQueue : 高效并发队列,使用链表实现,可以看成线程安全的LinkedList。

  • BlockingQueue : 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用作数据共享 。

  • ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找 。

另外Collections工具类可以帮助我们将任意集合包装成线程安全的集合。在这里重点说下ConcurrentHashMap和BlockingQueue这两个并发容器。

我们都知道HashMap线程不安全的,而我们可以通过Collections.synchronizedMap(new HashMap<>())来包装一个线程安全的HashMap或者使用线程安全的HashTable,但是它们的效率都不是很好,这时候我们就有了ConcurrentHashMap。为什么ConcurrentHashMap高效且线程安全呢?其实它使用了锁分段技术来提高了并发的访问率。假如容器里有多把锁,每一把锁用于锁容器的一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效地提高并发访问效率,这就是锁分段技术。首先将数据分成一段段的存储,然后给每段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。而既然数据被分成了多个段,线程如何定位要访问的段的数据呢?这里其实是通过散列算法来定位的。

现在来谈谈阻塞队列,阻塞队列其实跟后面要谈的线程池息息相关的,JDK7提供了7个阻塞队列,分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。  

如果队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,让生产者和消费者能够高效率的进行通讯呢?让我们先来看看JDK是如何实现的。

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下:

     image.png

当我们往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过LockSupport.park(this)来实现

     image.png

继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。

       image.png

unsafe.park是个native方法,代码如下:

       image.png

park这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。

     

  • 与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,然后再执行的park。

  • 线程被中断时。

  • 如果参数中的time不是零,等待了指定的毫秒数时。

  • 发生异常现象时。这些异常事先无法确定。  

我们继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:

     image.png

pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。

当队列满时,生产者往阻塞队列里插入一个元素,生产者线程会进入WAITING (parking)状态。  

executor

Executor框架提供了各种类型的线程池,不同的线程池应用了前面介绍的不同的堵塞队列

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。 对于核心的几个线程池,无论是newFixedThreadPool()、newSingleThreadExecutor()还是newCacheThreadPool()方法,虽然看起来创建的线程具有完全不同的功能特点,但其内部均使用了ThreadPoolExecutor实现

      image.png

  • newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize一样的,并使用了LinkedBlockingQueue任务队列(无界队列)的线程池。当任务提交非常频繁时,该队列可能迅速膨胀,从而系统资源耗尽。

  • newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1。

  • newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它总是破事线程池增加新的线程来执行任务。当任务执行完后由于corePoolSize为0,因此空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),如果有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样做法可能会很快耗尽系统的资源,因为它会增加无穷大数量的线程。

     

由以上线程池的实现可以看到,它们都只是ThreadPoolExecutor类的封装。我们看下ThreadPoolExecutor最重要的构造函数:

      image.png

ThreadPoolExecutor的任务调度逻辑如下

从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 首先线程池判断基本线程池是否已满,如果没满,创建一个工作线程来执行任务。满了,则进入下个流程。

  • 其次线程池判断工作队列是否已满,如果没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

  • 最后线程池判断整个线程池是否已满,如果没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。 

下面我们来看看ThreadPoolExecutor核心调度代码

      image.png

从上面的源码我们可以知道execute的执行步骤:

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

  • 如果运行的线程等于或多于corePoolSize,则将任务加入到BlockingQueue。

  • 如果无法将任务假如BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  • 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能的避免获取全局锁(那将会是一个严重的 可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

参考阅读

本文部分内容参考自《Java并发编程的艺术》、《深入理解Java虚拟机(第2版)》、《实战Java高并发程序设计》、《深入Java内存模型》、《Java并发编程实践》,感兴趣的可自行查阅。


本文转载自异步社区。

原文链接:https://www.epubit.com/articleDetails?id=NC7E3EF90A570000167F41710130BBAA0

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。