java 并发编程学习笔记(六)之 AQS (AbstractQueuedSynchronizer)
AQS
(1)aqs
- 使用node实现fifo队列,可以用于构建锁或者其他的同步装置的基础框架
- 利用了一个int类型表示状态
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态{acquire 和 release}
- 可以同时实现排他锁和共享锁模式(独占,共享)
(2)CountDownLatch
-
-
/**
-
* 一般用于 当主线程需要等待 子线程执行完成之后 再执行的场景
-
* * 线程名称 子线程结束
-
* * thread1 ---------------------- end |
-
* * thread2 ---------------------- end | 主线程 主线程结束
-
* * thread3 ---------------------- end | --------------------- end
-
* * thread4 ---------------------- end |
-
* * thread5 ---------------------- end |
-
* *
-
*
-
*/
-
@Slf4j
-
public class CountDownLatchExample1 {
-
-
private static int threadCount = 200;
-
-
public static void main(String[] args) throws InterruptedException {
-
-
ExecutorService service = Executors.newCachedThreadPool();
-
-
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
-
-
-
for (int i = 0; i < threadCount; i++) {
-
final int threadNum = i;
-
service.execute(() -> {
-
try {
-
test(threadNum);
-
} catch (InterruptedException e) {
-
log.error("exception",e);
-
} finally {
-
countDownLatch.countDown();
-
}
-
});
-
}
-
-
// countDownLatch.await();
-
countDownLatch.await(10, TimeUnit.MILLISECONDS); //超时等待
-
log.info("finish");
-
service.shutdown();
-
}
-
-
private static void test(int threadNum) throws InterruptedException {
-
log.info("{}", threadNum);
-
Thread.sleep(100);
-
}
-
}
(3)Semaphore
-
/**
-
* 一般用于控制同一时刻运行的线程数量
-
* | |
-
* ---------|----- |---------
-
* | |
-
* ---------|----- |---------
-
* | |
-
*/
-
@Slf4j
-
public class SemaphoreExample1 {
-
-
-
private static int threadCount = 20;
-
-
public static void main(String[] args) throws InterruptedException {
-
-
ExecutorService service = Executors.newCachedThreadPool();
-
-
final Semaphore semaphore = new Semaphore(3);
-
-
for (int i = 0; i < threadCount; i++) {
-
final int threadNum = i;
-
service.execute(() -> {
-
try {
-
//semaphore.acquire(); 每次拿一个许可证
-
//semaphore.acquire(3); 每次拿三个许可证
-
semaphore.tryAcquire(3);
-
semaphore.tryAcquire(1,TimeUnit.SECONDS); //尝试一秒之内获取许可
-
semaphore.tryAcquire(3,1,TimeUnit.SECONDS);
-
if(semaphore.tryAcquire()) { //尝试获取许可证 ,没有获取到直接将当前线程丢弃
-
-
test(threadNum);
-
semaphore.release();
-
}else {
-
log.info(Thread.currentThread().getName()+"我没有拿到许可证,┭┮﹏┭┮");
-
}
-
} catch (InterruptedException e) {
-
log.error("exception", e);
-
}
-
});
-
}
-
-
service.shutdown();
-
}
-
-
private static void test(int threadNum) throws InterruptedException {
-
log.info("{}", threadNum);
-
Thread.sleep(1000);
-
}
-
}
(4)CyclicBarrier
-
/**
-
* 一般用于多个线程之间相互等待,当全部都到达某个屏障点的时候在,继续执行每个线程,并且可以重复循环使用
-
* 线程名称 某个屏障点 终点
-
* thread1 ------------| ---------- end
-
* thread2 ------------| ---------- end
-
* thread3 ------------| ---------- end
-
* thread4 ------------| ---------- end
-
* thread5 ------------| ---------- end
-
*
-
*/
-
@Slf4j
-
public class CyclicBarrierExample1 {
-
-
// private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
-
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {
-
log.info("五个线程都已经准备就绪");
-
});
-
-
public static void main(String[] args) throws InterruptedException {
-
ExecutorService service = Executors.newCachedThreadPool();
-
-
for (int i = 0; i < 10; i++) {
-
final int threadNum = i;
-
Thread.sleep(1000);
-
service.execute(() -> {
-
try {
-
race(threadNum);
-
} catch (InterruptedException | BrokenBarrierException e) {
-
e.printStackTrace();
-
}
-
});
-
}
-
service.shutdown();
-
}
-
-
private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {
-
Thread.sleep(1000);
-
log.info("{} is ready", threadNum);
-
cyclicBarrier.await();
-
try {
-
// cyclicBarrier.await(1, TimeUnit.SECONDS);
-
} catch (Exception e) {
-
// e.printStackTrace();
-
}
-
log.info("{} continue", threadNum);
-
}
-
}
(5)锁
锁的简单使用:
-
/**
-
* class Point {
-
* * private double x, y;
-
* * private final StampedLock sl = new StampedLock();
-
* *
-
* * void move(double deltaX, double deltaY) { // an exclusively locked method
-
* * long stamp = sl.writeLock();
-
* * try {
-
* * x += deltaX;
-
* * y += deltaY;
-
* * } finally {
-
* * sl.unlockWrite(stamp);
-
* * }
-
* * }
-
* *
-
* * double distanceFromOrigin() { // A read-only method
-
* * long stamp = sl.tryOptimisticRead();
-
* * double currentX = x, currentY = y;
-
* * if (!sl.validate(stamp)) {
-
* * stamp = sl.readLock();
-
* * try {
-
* * currentX = x;
-
* * currentY = y;
-
* * } finally {
-
* * sl.unlockRead(stamp);
-
* * }
-
* * }
-
* * return Math.sqrt(currentX * currentX + currentY * currentY);
-
* * }
-
* *
-
* * void moveIfAtOrigin(double newX, double newY) { // upgrade
-
* * // Could instead start with optimistic, not read mode
-
* * long stamp = sl.readLock();
-
* * try {
-
* * while (x == 0.0 && y == 0.0) {
-
* * long ws = sl.tryConvertToWriteLock(stamp);
-
* * if (ws != 0L) {
-
* * stamp = ws;
-
* * x = newX;
-
* * y = newY;
-
* * break;
-
* * }
-
* * else {
-
* * sl.unlockRead(stamp);
-
* * stamp = sl.writeLock();
-
* * }
-
* * }
-
* * } finally {
-
* * sl.unlock(stamp);
-
* * }
-
* * }
-
* * }}</pre>
-
* *
-
*/
-
@Slf4j
-
public class LockExample1 {
-
-
private static int clientTotal = 5000;
-
private static int threadTotal = 200;
-
private static int count = 0;
-
-
-
//重入锁
-
private final static Lock lock = new ReentrantLock();
-
-
-
//重入读写锁
-
private final static Map<Integer, Integer> map = new TreeMap<Integer, Integer>();
-
private final static ReentrantReadWriteLock reenLock = new ReentrantReadWriteLock();
-
private final static Lock readLock = reenLock.readLock();
-
private final static Lock writeLock = reenLock.writeLock();
-
-
//stamped锁
-
private final static StampedLock stampedLock = new StampedLock();
-
-
//condition
-
private final static ReentrantLock REENTRANT_LOCK = new ReentrantLock();
-
private final static Condition condition = REENTRANT_LOCK.newCondition();
-
-
//重入读写锁的使用
-
public Integer getValue(Integer key) {
-
readLock.lock();
-
Integer value = null;
-
try {
-
value = map.get(key);
-
} catch (Exception e) {
-
e.printStackTrace();
-
} finally {
-
readLock.unlock();
-
}
-
return value;
-
}
-
-
//重入读写锁的使用
-
public Set<Integer> getSet() {
-
Set<Integer> set = null;
-
readLock.lock();
-
try {
-
set = map.keySet();
-
} catch (Exception e) {
-
e.printStackTrace();
-
} finally {
-
readLock.unlock();
-
}
-
return set;
-
}
-
-
/**
-
* 重入读写锁的使用
-
* @param key
-
* @param value
-
* @return
-
*/
-
public Integer put(Integer key, Integer value) {
-
writeLock.lock();
-
try {
-
map.put(key, value);
-
} catch (Exception e) {
-
e.printStackTrace();
-
} finally {
-
writeLock.unlock();
-
}
-
return value;
-
}
-
-
/**
-
* condition的使用
-
*/
-
public static void testCond(){
-
new Thread(() -> {
-
try {
-
REENTRANT_LOCK.lock();
-
log.info("运动员获取锁");
-
condition.await();
-
log.info("运动员接收到信号,比赛开始~~~~");
-
}catch (Exception e){
-
e.printStackTrace();
-
}finally {
-
REENTRANT_LOCK.unlock();
-
}
-
}).start();
-
-
new Thread(() -> {
-
try {
-
REENTRANT_LOCK.lock();
-
log.info("裁判获取锁");
-
Thread.sleep(3000);
-
log.info("裁判发送信号");
-
condition.signalAll();
-
}catch (Exception e){
-
e.printStackTrace();
-
}finally {
-
REENTRANT_LOCK.unlock();
-
}
-
}).start();
-
}
-
-
-
public static void main(String[] args) throws InterruptedException {
-
ExecutorService service = Executors.newCachedThreadPool();
-
final Semaphore semaphore = new Semaphore(threadTotal);
-
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
-
for (int i = 0; i < clientTotal; i++) {
-
service.execute(() -> {
-
try {
-
semaphore.acquire();
-
add();
-
semaphore.release();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} finally {
-
countDownLatch.countDown();
-
}
-
});
-
}
-
countDownLatch.await();
-
service.shutdown();
-
log.info("count {}", count);
-
testCond();
-
}
-
-
/**
-
* stampedLock的使用方式
-
*/
-
private static void add() {
-
long stamp = stampedLock.writeLock();
-
try {
-
count++;
-
} catch (Exception e) {
-
} finally {
-
stampedLock.unlock(stamp);
-
}
-
}
-
}
同步锁synchronized不是JUC中的锁但也顺便提下,它是由synchronized 关键字进行同步,实现对竞争资源互斥访问的锁。
同步锁的原理:对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。在同一个时间点该同步锁能且只能被一个线程获取到,其他线程都得等待。
另外:synchronized是Java中的关键字且是内置的语言实现;它是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;synchronized等待的线程会一直等待下去,不能响应中断。
重入锁ReentrantLock,顾名思义:就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。另外该锁孩纸获取锁时的公平和非公平性选择,所以它包含公平锁与非公平锁(它们两也可以叫可重入锁)。首先提出两个疑问:它怎么实现重进入呢?释放逻辑还跟AQS中一样吗?
ReentrantLock 独有的功能
- 可指定是公平锁还是非公共锁,公平锁就是 先等待的线程先获得锁
- 提供了一个condition类,可以分组唤醒需要的线程,
- 提供了能够中断等待锁的线程机制,lock.lockInterruptibly()
非公平锁
-
final boolean nonfairTryAcquire(int acquires) {
-
final Thread current = Thread.currentThread();
-
int c = getState();
-
if (c == 0) {
-
if (compareAndSetState(0, acquires)) {
-
setExclusiveOwnerThread(current);
-
return true;
-
}
-
}
-
// 同步状态已经被其他线程占用,则判断当前线程是否与被占用的线程是同一个线程,如果是同一个线程则允许获取,并state+1
-
else if (current == getExclusiveOwnerThread()) {
-
int nextc = c + acquires;
-
if (nextc < 0) // overflow
-
throw new Error("Maximum lock count exceeded");
-
setState(nextc);
-
return true;
-
}
-
return false;
-
}
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功。如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
-
protected final boolean tryRelease(int releases) {
-
int c = getState() - releases;
-
if (Thread.currentThread() != getExclusiveOwnerThread())
-
throw new IllegalMonitorStateException();
-
boolean free = false;
-
if (c == 0) {
-
free = true;
-
setExclusiveOwnerThread(null);
-
}
-
setState(c);
-
return free;
-
}
上面代码是释放锁的代码。如果该锁被获取了n次,那么前(n-1)次都是返回false,直至state=0,将占有线程设置为null,并返回true,表示释放成功。
公平锁
公平锁与非公平锁有啥区别呢? 还是从源码中分析吧。
-
protected final boolean tryAcquire(int acquires) {
-
final Thread current = Thread.currentThread();
-
int c = getState();
-
if (c == 0) {
-
// 区别:增加判断同步队列中当前节点是否有前驱节点的判断
-
if (!hasQueuedPredecessors() &&
-
compareAndSetState(0, acquires)) {
-
setExclusiveOwnerThread(current);
-
return true;
-
}
-
}
-
// 一样支持重入
-
else if (current == getExclusiveOwnerThread()) {
-
int nextc = c + acquires;
-
if (nextc < 0)
-
throw new Error("Maximum lock count exceeded");
-
setState(nextc);
-
return true;
-
}
-
return false;
-
}
与非公平锁的唯一不同就是增加了一个判断条件:判断同步队列中当前节点是否有前驱节点的判断,如果方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
公平锁与非公平锁的区别
从上面源码中得知,公平性锁保证了锁的获取按照FIFO原则,但是代价就是进行大量的线程切换。而非公平性锁,可能会造成线程“饥饿”(不会保证先进来的就会先获取),但是极少线程的切换,保证了更大的吞吐量。下面我们看下案例:
-
import org.junit.Test;
-
-
import java.util.*;
-
import java.util.concurrent.*;
-
import java.util.concurrent.locks.Lock;
-
import java.util.concurrent.locks.ReentrantLock;
-
-
public class FairAndUnfairTest {
-
private static Lock fairLock = new ReentrantLock2(true);
-
private static Lock unFairLock = new ReentrantLock2(false);
-
-
@Test
-
public void fair() throws Exception{
-
testLock(fairLock);
-
}
-
-
@Test
-
public void unFairLock() throws Exception{
-
testLock(unFairLock);
-
}
-
-
private static void testLock(Lock lock) throws InterruptedException, ExecutionException {
-
ExecutorService threadPool = Executors.newFixedThreadPool(5);
-
List<Future<Long>> list = new ArrayList<>();
-
for (int i = 0 ; i < 5; i++) {
-
Future<Long> future = threadPool.submit(new Job(lock));
-
list.add(future);
-
}
-
long cost = 0;
-
for (Future<Long> future : list) {
-
cost += future.get();
-
}
-
// 查看五个线程所需耗时的时间
-
System.out.println("cost:" + cost + " ms");
-
}
-
-
private static class Job implements Callable<Long> {
-
private Lock lock;
-
public Job(Lock lock) {
-
this.lock = lock;
-
}
-
@Override
-
public Long call() throws Exception {
-
long st = System.currentTimeMillis();
-
// 同一线程获取100锁
-
for (int i =0; i < 100; i ++) {
-
lock.lock();
-
try {
-
System.out.println("Lock by[" + Thread.currentThread().getId() + "]," +
-
"Waiting by[" + printThread(((ReentrantLock2)lock).getQueuedThreads()) + "]");
-
} catch (Exception e) {
-
e.printStackTrace();
-
} finally {
-
lock.unlock();
-
}
-
}
-
// 返回100次所需的时间
-
return System.currentTimeMillis() - st;
-
}
-
-
private String printThread(Collection<Thread> list) {
-
StringBuilder ids = new StringBuilder();
-
for (Thread t : list) {
-
ids.append(t.getId()).append(",");
-
}
-
return ids.toString();
-
}
-
}
-
-
private static class ReentrantLock2 extends ReentrantLock {
-
public ReentrantLock2(boolean fair) {
-
super(fair);
-
}
-
-
public Collection<Thread> getQueuedThreads() {
-
List<Thread> arrayList = new ArrayList<>(super.getQueuedThreads());
-
Collections.reverse(arrayList);
-
return arrayList;
-
}
-
}
-
}
非公平性锁的测试结果,cost:117 ms
公平性锁的测试结果,cost:193 ms
读写锁
读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁(同一时刻只允许一个线程进行访问)有了很大的提升。
下面我们看下它有啥特性:
特性 |
说明 |
公平性选择 |
支持非公平(默认)和公平的所获取方式,吞吐量还是非公平优于公平 |
可重入 |
该锁支持可重进入。 读线程在获取了读锁之后能够再次获取读锁。 写线程在获取了写锁之后能够再次获取写锁。 |
锁降级 |
遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成读锁。 |
排他性 |
当写线程访问时,其他读写线程均被阻塞 |
另外读写锁是采取一个整型变量来维护多种状态。高16位表示读,低16位表示写。
// 偏移位
-
static final int SHARED_SHIFT = 16;
-
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
-
// 读写线程允许占用的最大数
-
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
-
// 独占标志
-
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
下面从源码中找出这些特性,先看下写锁的实现:
-
1 protected final boolean tryAcquire(int acquires) {
-
2
-
3 Thread current = Thread.currentThread();
-
4 int c = getState();
-
5 // 表示独占个数,也就是与低16位进行与运算。
-
6 int w = exclusiveCount(c);
-
7 if (c != 0) {
-
8 // c!=0 且 w==0表示不存在写线程,但存在读线程
-
9 if (w == 0 || current != getExclusiveOwnerThread())
-
10 return false;
-
11 if (w + exclusiveCount(acquires) > MAX_COUNT)
-
12 throw new Error("Maximum lock count exceeded");
-
13 /**
-
14 * 获取写锁的条件:
-
15 * 不能存在读线程且当前线程是当前占用锁的线程(这里体现可重入性和排他性);
-
16 * 当前占用锁的次数不能超过最大数
-
17 */
-
18 setState(c + acquires);
-
19 return true;
-
20 }
-
21 if (writerShouldBlock() ||
-
22 !compareAndSetState(c, c + acquires))
-
23 return false;
-
24 setExclusiveOwnerThread(current);
-
25 return true;
-
26 }
-
27 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
获取读锁源码如下:
-
protected final int tryAcquireShared(int unused) {
-
Thread current = Thread.currentThread();
-
int c = getState();
-
/**
-
* exclusiveCount(c) != 0: 表示有写线程在占用
-
* getExclusiveOwnerThread() != current : 当前占用锁的线程不是当前线程。
-
* 如果上面两个条件同时满足,则获取失败。
-
* 上面表明如果当前线程是拥有写锁的线程可以获取读锁(体现可重入和锁降级)。
-
*/
-
if (exclusiveCount(c) != 0 &&
-
getExclusiveOwnerThread() != current)
-
return -1;
-
int r = sharedCount(c);
-
if (!readerShouldBlock() &&
-
r < MAX_COUNT &&
-
compareAndSetState(c, c + SHARED_UNIT)) {
-
if (r == 0) {
-
firstReader = current;
-
firstReaderHoldCount = 1;
-
} else if (firstReader == current) {
-
firstReaderHoldCount++;
-
} else {
-
HoldCounter rh = cachedHoldCounter;
-
if (rh == null || rh.tid != getThreadId(current))
-
cachedHoldCounter = rh = readHolds.get();
-
else if (rh.count == 0)
-
readHolds.set(rh);
-
rh.count++;
-
}
-
return 1;
-
}
-
return fullTryAcquireShared(current);
-
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/84282937
- 点赞
- 收藏
- 关注作者
评论(0)