【精通高并发系列】两种异步模型与深度解析Future接口(二)

举报
冰 河 发表于 2021/07/27 20:06:44 2021/07/27
【摘要】 本文有点长,但是满满的干货,以实际案例的形式分析了两种异步模型,并从源码角度深度解析Future接口和FutureTask类,希望大家踏下心来,打开你的IDE,跟着文章看源码,相信你一定收获不小!

大家好,我是冰河~~
我们继续聊两种异步模型与深度解析Future接口这个话题~~

二、深度解析Future接口

1.Future****接口

Future是JDK1.5新增的异步编程接口,其源代码如下所示。

package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到,在Future接口中,总共定义了5个抽象方法。接下来,就分别介绍下这5个方法的含义。

  • cancel(boolean)

取消任务的执行,接收一个boolean类型的参数,成功取消任务,则返回true,否则返回false。当任务已经完成,已经结束或者因其他原因不能取消时,方法会返回false,表示任务取消失败。当任务未启动调用了此方法,并且结果返回true(取消成功),则当前任务不再运行。如果任务已经启动,会根据当前传递的boolean类型的参数来决定是否中断当前运行的线程来取消当前运行的任务。

  • isCancelled()

判断任务在完成之前是否被取消,如果在任务完成之前被取消,则返回true;否则,返回false。

这里需要注意一个细节:只有任务未启动,或者在完成之前被取消,才会返回true,表示任务已经被成功取消。其他情况都会返回false

  • isDone()

判断任务是否已经完成,如果任务正常结束、抛出异常退出、被取消,都会返回true,表示任务已经完成。

  • get()

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成并返回任务的结果数据。

  • get(long, TimeUnit)

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成,并设置了超时等待时间。在超时时间内任务完成,则返回结果;否则,抛出TimeoutException异常。

2.RunnableFuture接口

Future接口有一个重要的子接口,那就是RunnableFuture接口,RunnableFuture接口不但继承了Future接口,而且继承了java.lang.Runnable接口,其源代码如下所示。

package java.util.concurrent;

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

这里,问一下,RunnableFuture接口中有几个抽象方法?想好了再说!哈哈哈。。。

这个接口比较简单run()方法就是运行任务时调用的方法。

3.FutureTask类

FutureTask类是RunnableFuture接口的一个非常重要的实现类,它实现了RunnableFuture接口、Future接口和Runnable接口的所有方法。FutureTask类的源代码比较多,这个就不粘贴了,大家自行到java.util.concurrent下查看。

(1)FutureTask类中的变量与常量

在FutureTask类中首先定义了一个状态变量state,这个变量使用了volatile关键字修饰,这里,大家只需要知道volatile关键字通过内存屏障和禁止重排序优化来实现线程安全,后续会单独深度分析volatile关键字是如何保证线程安全的。紧接着,定义了几个任务运行时的状态常量,如下所示。

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

其中,代码注释中给出了几个可能的状态变更流程,如下所示。

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

接下来,定义了其他几个成员变量,如下所示。

private Callable<V> callable;
private Object outcome; 
private volatile Thread runner;
private volatile WaitNode waiters;

又看到我们所熟悉的Callable接口了,Callable接口那肯定就是用来调用call()方法执行具体任务了。

  • outcome:Object类型,表示通过get()方法获取到的结果数据或者异常信息。
  • runner:运行Callable的线程,运行期间会使用CAS保证线程安全,这里大家只需要知道CAS是Java保证线程安全的一种方式,后续文章中会深度分析CAS如何保证线程安全。
  • waiters:WaitNode类型的变量,表示等待线程的堆栈,在FutureTask的实现中,会通过CAS结合此堆栈交换任务的运行状态。

看一下WaitNode类的定义,如下所示。

static final class WaitNode {
	volatile Thread thread;
	volatile WaitNode next;
	WaitNode() { thread = Thread.currentThread(); }
}

可以看到,WaitNode类是FutureTask类的静态内部类,类中定义了一个Thread成员变量和指向下一个WaitNode节点的引用。其中通过构造方法将thread变量设置为当前线程。

(2)构造方法

接下来,是FutureTask的两个构造方法,比较简单,如下所示。

public FutureTask(Callable<V> callable) {
	if (callable == null)
		throw new NullPointerException();
	this.callable = callable;
	this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
	this.callable = Executors.callable(runnable, result);
	this.state = NEW;
}

(3)是否取消与完成方法

继续向下看源码,看到一个任务是否取消的方法,和一个任务是否完成的方法,如下所示。

public boolean isCancelled() {
	return state >= CANCELLED;
}

public boolean isDone() {
	return state != NEW;
}

这两方法中,都是通过判断任务的状态来判定任务是否已取消和已完成的。为啥会这样判断呢?再次查看FutureTask类中定义的状态常量发现,其常量的定义是有规律的,并不是随意定义的。其中,大于或者等于CANCELLED的常量为CANCELLED、INTERRUPTING和INTERRUPTED,这三个状态均可以表示线程已经被取消。当状态不等于NEW时,可以表示任务已经完成。

通过这里,大家可以学到一点:以后在编码过程中,要按照规律来定义自己使用的状态,尤其是涉及到业务中有频繁的状态变更的操作,有规律的状态可使业务处理变得事半功倍,这也是通过看别人的源码设计能够学到的,这里,建议大家还是多看别人写的优秀的开源框架的源码。

(4)取消方法

我们继续向下看源码,接下来,看到的是cancel(boolean)方法,如下所示。

public boolean cancel(boolean mayInterruptIfRunning) {
	if (!(state == NEW &&
		  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
			  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
		return false;
	try {    // in case call to interrupt throws exception
		if (mayInterruptIfRunning) {
			try {
				Thread t = runner;
				if (t != null)
					t.interrupt();
			} finally { // final state
				UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
			}
		}
	} finally {
		finishCompletion();
	}
	return true;
}

接下来,拆解cancel(boolean)方法。在cancel(boolean)方法中,首先判断任务的状态和CAS的操作结果,如果任务的状态不等于NEW或者CAS的操作返回false,则直接返回false,表示任务取消失败。如下所示。

if (!(state == NEW &&
	  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
		  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
	return false;

接下来,在try代码块中,首先判断是否可以中断当前任务所在的线程来取消任务的运行。如果可以中断当前任务所在的线程,则以一个Thread临时变量来指向运行任务的线程,当指向的变量不为空时,调用线程对象的interrupt()方法来中断线程的运行,最后将线程标记为被中断的状态。如下所示。

try {
	if (mayInterruptIfRunning) {
		try {
			Thread t = runner;
			if (t != null)
				t.interrupt();
		} finally { // final state
			UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
		}
	}
}

这里,发现变更任务状态使用的是UNSAFE.putOrderedInt()方法,这个方法是个什么鬼呢?点进去看一下,如下所示。

public native void putOrderedInt(Object var1, long var2, int var4);

可以看到,又是一个本地方法,嘿嘿,这里先不管它,后续文章会详解这些方法的作用。

接下来,cancel(boolean)方法会进入finally代码块,如下所示。

finally {
	finishCompletion();
}

可以看到在finallly代码块中调用了finishCompletion()方法,顾名思义,finishCompletion()方法表示结束任务的运行,接下来看看它是如何实现的。点到finishCompletion()方法中看一下,如下所示。

private void finishCompletion() {
	// assert state > COMPLETING;
	for (WaitNode q; (q = waiters) != null;) {
		if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
			for (;;) {
				Thread t = q.thread;
				if (t != null) {
					q.thread = null;
					LockSupport.unpark(t);
				}
				WaitNode next = q.next;
				if (next == null)
					break;
				q.next = null; // unlink to help gc
				q = next;
			}
			break;
		}
	}
	done();
	callable = null;        // to reduce footprint
}

在finishCompletion()方法中,首先定义一个for循环,循环终止因子为waiters为null,在循环中,判断CAS操作是否成功,如果成功进行if条件中的逻辑。首先,定义一个for自旋循环,在自旋循环体中,唤醒WaitNode堆栈中的线程,使其运行完成。当WaitNode堆栈中的线程运行完成后,通过break退出外层for循环。接下来调用done()方法。done()方法又是个什么鬼呢?点进去看一下,如下所示。

protected void done() { }

可以看到,done()方法是一个空的方法体,交由子类来实现具体的业务逻辑。

当我们的具体业务中,需要在取消任务时,执行一些额外的业务逻辑,可以在子类中覆写done()方法的实现。

(5)get()方法

继续向下看FutureTask类的代码,FutureTask类中实现了两个get()方法,如下所示。

public V get() throws InterruptedException, ExecutionException {
	int s = state;
	if (s <= COMPLETING)
		s = awaitDone(false, 0L);
	return report(s);
}

public V get(long timeout, TimeUnit unit)
	throws InterruptedException, ExecutionException, TimeoutException {
	if (unit == null)
		throw new NullPointerException();
	int s = state;
	if (s <= COMPLETING &&
		(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
		throw new TimeoutException();
	return report(s);
}

没参数的get()方法为当任务未运行完成时,会阻塞,直到返回任务结果。有参数的get()方法为当任务未运行完成,并且等待时间超出了超时时间,会TimeoutException异常。

两个get()方法的主要逻辑差不多,一个没有超时设置,一个有超时设置,这里说一下主要逻辑。判断任务的当前状态是否小于或者等于COMPLETING,也就是说,任务是NEW状态或者COMPLETING,调用awaitDone()方法,看下awaitDone()方法的实现,如下所示。

private int awaitDone(boolean timed, long nanos)
	throws InterruptedException {
	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	WaitNode q = null;
	boolean queued = false;
	for (;;) {
		if (Thread.interrupted()) {
			removeWaiter(q);
			throw new InterruptedException();
		}

		int s = state;
		if (s > COMPLETING) {
			if (q != null)
				q.thread = null;
			return s;
		}
		else if (s == COMPLETING) // cannot time out yet
			Thread.yield();
		else if (q == null)
			q = new WaitNode();
		else if (!queued)
			queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
												 q.next = waiters, q);
		else if (timed) {
			nanos = deadline - System.nanoTime();
			if (nanos <= 0L) {
				removeWaiter(q);
				return state;
			}
			LockSupport.parkNanos(this, nanos);
		}
		else
			LockSupport.park(this);
	}
}

接下来,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循环,在循环中首先判断当前线程是否被中断,如果已经被中断,则调用removeWaiter()将当前线程从堆栈中移除,并且抛出InterruptedException异常,如下所示。

if (Thread.interrupted()) {
	removeWaiter(q);
	throw new InterruptedException();
}

接下来,判断任务的当前状态是否完成,如果完成,并且堆栈句柄不为空,则将堆栈中的当前线程设置为空,返回当前任务的状态,如下所示。

int s = state;
if (s > COMPLETING) {
	if (q != null)
		q.thread = null;
	return s;
}

当任务的状态为COMPLETING时,使当前线程让出CPU资源,如下所示。

else if (s == COMPLETING)
	Thread.yield();

如果堆栈为空,则创建堆栈对象,如下所示。

else if (q == null)
	q = new WaitNode();

如果queued变量为false,通过CAS操作为queued赋值,如果awaitDone()方法传递的timed参数为true,则计算超时时间,当时间已超时,则在堆栈中移除当前线程并返回任务状态,如下所示。如果未超时,则重置超时时间,如下所示。

else if (!queued)
	queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
	nanos = deadline - System.nanoTime();
	if (nanos <= 0L) {
		removeWaiter(q);
		return state;
	}
	LockSupport.parkNanos(this, nanos);
}

如果不满足上述的所有条件,则将当前线程设置为等待状态,如下所示。

else
	LockSupport.park(this);

接下来,回到get()方法中,当awaitDone()方法返回结果,或者任务的状态不满足条件时,都会调用report()方法,并将当前任务的状态传递到report()方法中,并返回结果,如下所示。

return report(s);

看来,这里还要看下report()方法啊,点进去看下report()方法的实现,如下所示。

private V report(int s) throws ExecutionException {
	Object x = outcome;
	if (s == NORMAL)
		return (V)x;
	if (s >= CANCELLED)
		throw new CancellationException();
	throw new ExecutionException((Throwable)x);
}

可以看到,report()方法的实现比较简单,首先,将outcome数据赋值给x变量,接下来,主要是判断接收到的任务状态,如果状态为NORMAL,则将x强转为泛型类型返回;当任务的状态大于或者等于CANCELLED,也就是任务已经取消,则抛出CancellationException异常,其他情况则抛出ExecutionException异常。

至此,get()方法分析完成。注意:一定要理解get()方法的实现,因为get()方法是我们使用Future接口和FutureTask类时,使用的比较频繁的一个方法。

(6)set()方法与setException()方法

继续看FutureTask类的代码,接下来看到的是set()方法与setException()方法,如下所示。

protected void set(V v) {
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
		outcome = v;
		UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
		finishCompletion();
	}
}

protected void setException(Throwable t) {
	if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
		outcome = t;
		UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
		finishCompletion();
	}
}

通过源码可以看出,set()方法与setException()方法整体逻辑几乎一样,只是在设置任务状态时一个将状态设置为NORMAL,一个将状态设置为EXCEPTIONAL。

至于finishCompletion()方法,前面已经分析过。

(7)run()方法与runAndReset()方法

接下来,就是run()方法了,run()方法的源代码如下所示。

public void run() {
	if (state != NEW ||
		!UNSAFE.compareAndSwapObject(this, runnerOffset,
									 null, Thread.currentThread()))
		return;
	try {
		Callable<V> c = callable;
		if (c != null && state == NEW) {
			V result;
			boolean ran;
			try {
				result = c.call();
				ran = true;
			} catch (Throwable ex) {
				result = null;
				ran = false;
				setException(ex);
			}
			if (ran)
				set(result);
		}
	} finally {
		// runner must be non-null until state is settled to
		// prevent concurrent calls to run()
		runner = null;
		// state must be re-read after nulling runner to prevent
		// leaked interrupts
		int s = state;
		if (s >= INTERRUPTING)
			handlePossibleCancellationInterrupt(s);
	}
}

可以这么说,只要使用了Future和FutureTask,就必然会调用run()方法来运行任务,掌握run()方法的流程是非常有必要的。在run()方法中,如果当前状态不是NEW,或者CAS操作返回的结果为false,则直接返回,不再执行后续逻辑,如下所示。

if (state != NEW ||
	!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
	return;

接下来,在try代码块中,将成员变量callable赋值给一个临时变量c,判断临时变量不等于null,并且任务状态为NEW,则调用Callable接口的call()方法,并接收结果数据。并将ran变量设置为true。当程序抛出异常时,将接收结果的变量设置为null,ran变量设置为false,并且调用setException()方法将任务的状态设置为EXCEPTIONA。接下来,如果ran变量为true,则调用set()方法,如下所示。

try {
	Callable<V> c = callable;
	if (c != null && state == NEW) {
		V result;
		boolean ran;
		try {
			result = c.call();
			ran = true;
		} catch (Throwable ex) {
			result = null;
			ran = false;
			setException(ex);
		}
		if (ran)
			set(result);
	}
}

接下来,程序会进入finally代码块中,如下所示。

finally {
	// runner must be non-null until state is settled to
	// prevent concurrent calls to run()
	runner = null;
	// state must be re-read after nulling runner to prevent
	// leaked interrupts
	int s = state;
	if (s >= INTERRUPTING)
		handlePossibleCancellationInterrupt(s);
}

这里,将runner设置为null,如果任务的当前状态大于或者等于INTERRUPTING,也就是线程被中断了。则调用handlePossibleCancellationInterrupt()方法,接下来,看下handlePossibleCancellationInterrupt()方法的实现。

private void handlePossibleCancellationInterrupt(int s) {
	if (s == INTERRUPTING)
		while (state == INTERRUPTING)
			Thread.yield();
}

可以看到,handlePossibleCancellationInterrupt()方法的实现比较简单,当任务的状态为INTERRUPTING时,使用while()循环,条件为当前任务状态为INTERRUPTING,将当前线程占用的CPU资源释放,也就是说,当任务运行完成后,释放线程所占用的资源。

runAndReset()方法的逻辑与run()差不多,只是runAndReset()方法会在finally代码块中将任务状态重置为NEW。runAndReset()方法的源代码如下所示,就不重复说明了。

protected boolean runAndReset() {
	if (state != NEW ||
		!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
		return false;
	boolean ran = false;
	int s = state;
	try {
		Callable<V> c = callable;
		if (c != null && s == NEW) {
			try {
				c.call(); // don't set result
				ran = true;
			} catch (Throwable ex) {
				setException(ex);
			}
		}
	} finally {
		// runner must be non-null until state is settled to
		// prevent concurrent calls to run()
		runner = null;
		// state must be re-read after nulling runner to prevent
		// leaked interrupts
		s = state;
		if (s >= INTERRUPTING)
			handlePossibleCancellationInterrupt(s);
	}
	return ran && s == NEW;
}

(8)removeWaiter()方法

removeWaiter()方法中主要是使用自旋循环的方式来移除WaitNode中的线程,比较简单,如下所示。

private void removeWaiter(WaitNode node) {
	if (node != null) {
		node.thread = null;
		retry:
		for (;;) {          // restart on removeWaiter race
			for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
				s = q.next;
				if (q.thread != null)
					pred = q;
				else if (pred != null) {
					pred.next = s;
					if (pred.thread == null) // check for race
						continue retry;
				}
				else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
													  q, s))
					continue retry;
			}
			break;
		}
	}
}

最后,在FutureTask类的最后,有如下代码。

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
	try {
		UNSAFE = sun.misc.Unsafe.getUnsafe();
		Class<?> k = FutureTask.class;
		stateOffset = UNSAFE.objectFieldOffset
			(k.getDeclaredField("state"));
		runnerOffset = UNSAFE.objectFieldOffset
			(k.getDeclaredField("runner"));
		waitersOffset = UNSAFE.objectFieldOffset
			(k.getDeclaredField("waiters"));
	} catch (Exception e) {
		throw new Error(e);
	}
}

关于这些代码的作用,会在后续深度解析CAS文章中详细说明,这里就不再探讨。

至此,关于Future接口和FutureTask类的源码就分析完了。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。