java 并发编程学习笔记(七)FutureTask, ForkJoin, BlockingQueue
(1)Future 、FutureTask
-
public class FutureExample {
-
-
-
static class MyTask implements Callable<String> {
-
@Override
-
public String call() throws Exception {
-
return "10000";
-
}
-
}
-
-
public static void main(String[] args) throws ExecutionException, InterruptedException {
-
//Future
-
ExecutorService service = Executors.newCachedThreadPool();
-
Future<String> future = service.submit(new MyTask());
-
String result = future.get();
-
System.out.println(result);
-
-
//FutureTask
-
FutureTask<String> futureTask = new FutureTask<String>(() -> {
-
return "10000";
-
});
-
new Thread(futureTask).start();
-
System.out.println(futureTask.get());
-
-
-
}
-
}
(2)ForkJoin
背景
虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。
Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin更加类似于单机版的MapReduce。
工作窃取算法
指的是某个线程从其他队列里窃取任务来执行。使用的场景是一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感觉A线程像是小偷在窃取B线程的东西一样。
工作窃取算法的优点:
利用了线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:
1、如果双端队列中只有一个任务时,线程间会存在竞争。
2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。
框架设计
Fork/Join中两个重要的类:
1、ForkJoinTask:使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:
a、RecursiveAction:用于没有返回结果的任务。
b、RecursiveTask:用于有返回结果的任务。
2、ForkJoinPool:任务ForkJoinTask需要通过ForkJoinPool来执行。
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.ForkJoinPool;
-
import java.util.concurrent.Future;
-
import java.util.concurrent.RecursiveTask;
-
-
public class CountTask extends RecursiveTask<Integer> {
-
private static final long serialVersionUID = 1L;
-
// 阈值
-
private static final int THRESHOLD = 2;
-
private int start;
-
private int end;
-
-
public CountTask(int start, int end) {
-
this.start = start;
-
this.end = end;
-
}
-
-
@Override
-
protected Integer compute() {
-
int sum = 0;
-
// 判断任务是否足够小
-
boolean canCompute = (end - start) <= THRESHOLD;
-
if (canCompute) {
-
// 如果小于阈值,就进行运算
-
for (int i = start; i <= end; i++) {
-
sum += i;
-
}
-
System.out.println(Thread.currentThread().getName()+" A sum:"+sum);
-
} else {
-
// 如果大于阈值,就再进行任务拆分
-
int middle = (start + end) / 2;
-
System.out.println(Thread.currentThread().getName()+" start:"+start+",middle:"+middle+",end:"+end);
-
CountTask leftTask = new CountTask(start, middle);
-
CountTask rightTask = new CountTask(middle + 1, end);
-
// 执行子任务
-
leftTask.fork();
-
rightTask.fork();
-
// 等待子任务执行完,并得到执行结果
-
int leftResult = leftTask.join();
-
int rightResult = rightTask.join();
-
// 合并子任务
-
sum = leftResult + rightResult;
-
System.out.println(Thread.currentThread().getName()+" B sum:"+sum);
-
}
-
return sum;
-
}
-
-
public static void main(String[] args) {
-
ForkJoinPool forkJoinPool = new ForkJoinPool();// 这边也可以指定一个最大线程数
-
CountTask task = new CountTask(1, 10);
-
// 执行一个任务
-
Future<Integer> result = forkJoinPool.submit(task);
-
try {
-
System.out.println(result.get());
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
-
}
-
-
}
这个程序是将1+2+3+4+5+6拆分成1+2;3+4;5+6三个部分进行子程序进行计算后合并。
源码解读
1、leftTask.fork();
-
1 public final ForkJoinTask<V> fork() {
-
2 Thread t;
-
3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
-
4 ((ForkJoinWorkerThread)t).workQueue.push(this);
-
5 else
-
6 ForkJoinPool.common.externalPush(this);
-
7 return this;
-
8 }
fork方法内部会先判断当前线程是否是ForkJoinWorkerThread的实例,如果满足条件,则将task任务push到当前线程所维护的双端队列中。
-
1 final void push(ForkJoinTask<?> task) {
-
2 ForkJoinTask<?>[] a; ForkJoinPool p;
-
3 int b = base, s = top, n;
-
4 if ((a = array) != null) { // ignore if queue removed
-
5 int m = a.length - 1; // fenced write for task visibility
-
6 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
-
7 U.putOrderedInt(this, QTOP, s + 1);
-
8 if ((n = s - b) <= 1) {
-
9 if ((p = pool) != null)
-
10 p.signalWork(p.workQueues, this);
-
11 }
-
12 else if (n >= m)
-
13 growArray();
-
14 }
-
15 }
在push方法中,会调用ForkJoinPool的signalWork方法唤醒或创建一个工作线程来异步执行该task任务。
-
public final V join() {
-
int s;
-
if ((s = doJoin() & DONE_MASK) != NORMAL)
-
reportException(s);
-
return getRawResult();
-
}
通过doJoin方法返回的任务状态来判断,如果不是NORMAL,则抛异常:
-
private void reportException(int s) {
-
if (s == CANCELLED)
-
throw new CancellationException();
-
if (s == EXCEPTIONAL)
-
rethrow(getThrowableException());
-
}
来看下doJoin方法:
-
private int doJoin() {
-
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
-
return (s = status) < 0 ? s :
-
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
-
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
-
tryUnpush(this) && (s = doExec()) < 0 ? s :
-
wt.pool.awaitJoin(w, this, 0L) :
-
externalAwaitDone();
-
}
先查看任务状态,如果已经完成,则直接返回任务状态;如果没有完成,则从任务队列中取出任务并执行。
(3)BlockingQueue
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.
BlockingQueue的核心方法:
-
public interface BlockingQueue<E> extends Queue<E> {
-
-
//将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
-
boolean add(E e);
-
-
//将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
-
boolean offer(E e);
-
-
//将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
-
void put(E e) throws InterruptedException;
-
-
//将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
-
boolean offer(E e, long timeout, TimeUnit unit)
-
throws InterruptedException;
-
-
//从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
-
E take() throws InterruptedException;
-
-
//在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。
-
E poll(long timeout, TimeUnit unit)
-
throws InterruptedException;
-
-
//获取队列中剩余的空间。
-
int remainingCapacity();
-
-
//从队列中移除指定的值。
-
boolean remove(Object o);
-
-
//判断队列中是否拥有该值。
-
public boolean contains(Object o);
-
-
//将队列中值,全部移除,并发设置到给定的集合中。
-
int drainTo(Collection<? super E> c);
-
-
//指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
-
int drainTo(Collection<? super E> c, int maxElements);
-
}
阻塞队列的成员
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded(有界) | 加锁 | arrayList |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedList |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
SynchronousQueue | bounded | 加锁 | 无 |
LinkedTransferQueue | unbounded | 加锁 | heap |
LinkedBlockingDeque | unbounded | 无锁 | heap |
-
ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
- LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
- PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
- DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
- SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
- LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/84302754
- 点赞
- 收藏
- 关注作者
评论(0)