【JUC并发编程】 详解锁与队列
📢📢📢📣📣📣
哈喽!大家好,我是【Bug 终结者】 ,【CSDNJava领域优质创作者】🏆,阿里云受邀专家博主🏆,51CTO TOP红人🏆 .
一位上进心十足,拥有极强学习力的【Java领域博主】😜😜😜
🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用!
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞
❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️
@[TOC]
一、锁的定义
☁️锁机制
所谓的锁,可以理解为内存中的一个整型数,拥有两种状态:空闲状态和上锁状态。
通过锁机制,能够保证在多核多线程环境中,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性。
所谓的锁,可以理解为内存中的一个整型数,拥有两种状态:空闲状态和上锁状态。加锁时,判断锁是否空闲,如果空闲,修改为上锁状态,返回成功。如果已经上锁,则返回失败。解锁时,则把锁状态修改为空闲状态。
二、Lock锁 与 Synchronized
传统的 Synchronized
加到代码块或声明方法中实现线程安全
但是会消耗性能
Lock锁
加锁与解锁
Lock锁实现类
Lock实现类公平锁与非公平锁
ReentrantLock
// 默认是非公平锁,在构造的时候传入true就说公平锁
//Ctrl + Alt + T 光标放在方法上 调出快捷方式 try-catch
Synchronized 和 Lock的区别
- 两者默认都是 非公平锁,这样可以提高性能
- Synchronized 是内置的Java关键字 , Lock是一个Java类
- Synchronized 无法判断获取锁的状态, Lock可以判断是否获取到了锁
- Synchronized 会自动释放锁,Lock不会自动释放,需要手动关闭锁,如果不释放,会造成死锁
- Synchronized 线程1(获得锁,阻塞)、线程2(等待,等待…); Lock就不一定会等待下去;Lock会去尝试获取锁 tryLock
- Synchronized 可重入锁,不可以中断,非公平;Lock 可重入锁,可以去判断锁,可以自行设置公平与非公平锁,传入boolean来选择锁
- Synchronized 适合锁少量的代码同步问题;Lock锁适合锁大量的同步代码
三、多线程下的引发的问题
⭐经典案例:生产者与消费者问题
生产者与消费者问题描述了两个线程进程 — 所谓的生产者与消费者,在实际运行时会发生原子性(数据不一致)问题。生产者的作用是一直投递数据,而消费者的作用就是一直不停的消费数据。该问题的关键就是要保证当生产者生产完毕产品后,若消费者没有消费产品,那么则停止生产,等待消费者消费完毕后继续生产产品,若生产者未生产产品,消费者已经消费完毕了产品,那么消费者等待生产者生产产品,生产者生产完毕后,消费者进行消费。
JUC版的生产者与消费者问题
通过Lock 找到 Condition
传统 和JUC版本
JUC 实现生产者与消费者
package com.wanshi.productorcustomer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 线程之间的通信问题:生产者与消费者问题!等待唤醒,通知唤醒
* 线程交替执行, Productor Customer 操作同一个变量,num = 0
* C:num+1
* P:num-1
* */
public class Productor2 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
//判断是否需要等待,业务,通知
//数字资源类
class Data2 {
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1 操作
public void increment() throws InterruptedException {
try {
lock.lock();
while (num != 0) {
//等待
condition.await();
}
num ++;
System.out.println(Thread.currentThread().getName() + " --- >" + num);
//通知其它线程,+1完毕
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
//-1 操作
public void decrement() throws InterruptedException {
try {
lock.lock();
while (num == 0) {
//等待
condition.await();
}
num --;
System.out.println(Thread.currentThread().getName() + " --- >" + num);
//通知其它线程,-1完毕
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
任何一个新的技术,绝对不是仅仅只是覆盖了原来的技术,有自己独特的优势和补充原来的技术
Condition
四、读写锁
读写锁的定义
读写锁是指两个锁,读锁和写锁。
为什么会存在读写锁呢?
- 因为synchronized粒度太大了,并不适合我们,可重入锁的粒度相较于读锁(共享锁)也较大,我们需要粒度小的锁。
- 大部分场景下,读不需要加锁,而写需要加锁,因为写入不加锁有可能出现写入覆盖和信息不一致的情况,并且大部分的读需求粒度更小的锁,这样会占用更少的资源。
独占锁(写锁) —次只能被一个线程占有
共享锁(读锁) 多个线程可以同时占有
ReadWriteLock 读写锁
package com.wanshi.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 独占锁(写锁) 一次只能被一个线程占用
* 共享锁(读锁)多个线程可以同时占用
* 读写锁
* ReadWriteLock
* 读-读 可以共存
* 读-写 不能共存
* 写-写 不能共存
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 5; i++) {
final int type = i;
new Thread(() -> {
myCache.put(type+"", type);
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int type = i;
new Thread(() -> {
myCache.get(type + "");
}, String.valueOf(i)).start();
}
}
}
/**
* 自定义缓存
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
//读写锁:更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,写,只希望又一个线程写
public void put(String key, Object val) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, val);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//取,读,多个线程可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取");
System.out.println(map.get(key));
System.out.println(Thread.currentThread().getName() + "读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
运行效果
五、常用的辅助类(必会)
☀️CountDownLatch
package com.wanshi.add;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Go out");
//数量-1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
//等待计数器归0 然后再向下执行
countDownLatch.await();
System.out.println("Close Door");
}
}
原理:
countDownLatch.countDown();
数量-1
countDownLatch.await();
等待计数器归0,然后再往下执行
每次有线程调用 countDown() 就会数量-1,假设计数器变为0,await方法就会被唤醒,继续执行
⛅CycliBarrier
package com.wanshi.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CycliBarrierDemo {
public static void main(String[] args) {
// 集齐7颗龙珠召唤神龙
//召唤龙珠线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集" + temp + "颗龙珠");
try {
//等待7个线程执行完毕
cyclicBarrier.await();
System.out.println("abc");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
⛄Semaphore
Semaphore:信号量
抢车位!
6个车 – 3个停车位
package com.wanshi.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//限流
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
//acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
//release() 释放
}, String.valueOf(i)).start();
}
}
}
semaphore.acquire();
获得,如果已经满了就等待被释放为止!
semaphore.release();
释放,会将当前的信号量释放 + 1, 然后唤醒等待的线程!
作用:多个共享资源互斥的使用,并发限流,控制最大的线程数
六、其它常用锁
⌛公平锁、非公平锁
公平锁:非常公平,不可以插队,必须先来先到
非公平锁:非常不公平,可以插队(默认都是非公平锁,目的是为了保证效率)
//Lock 锁实现类,默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//传入true代表改变为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
⚡可重入锁
可重入锁又叫递归锁,指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以获得锁使用并且不发生死锁,这样的锁就叫做可重入锁。 简单的来说就是: 在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到的。
Java中 ReentrantLock 和 synchronized 都是可重入锁,可重入锁能一定程度避免死锁。
Synchronized版本可重入锁
package com.wanshi.lock;
//Synchronized版本
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone {
public synchronized void sms() {
System.out.println(Thread.currentThread().getName() + " ---> 发信息...");
call();
}
public synchronized void call() {
System.out.println(Thread.currentThread().getName() + " ---> 打电话....");
}
}
Lock版本可重入锁
package com.wanshi.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone2 {
Lock lock = new ReentrantLock();
//注意:Lock锁必须成对出现,如果不是成对出现的,会出现死锁现象
public void sms() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " ---> 发信息...");
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " ---> 打电话....");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
➿自旋锁
自旋锁: spinLock,指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的开销,缺点是循环会消耗CPU资源。
自旋锁
自己实现自旋锁
package com.wanshi.lock;
import java.util.concurrent.atomic.AtomicReference;
public class SpinlockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " ---> myLock");
//期待更新的转换为线程 A线程进来后直接退出了,B线程进来后自旋,等待A线程结束后B线程结束自旋。
while (!atomicReference.compareAndSet(null, thread)) {
}
}
//解锁
public void myUnLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " ---> myUnLock");
atomicReference.compareAndSet(thread, null);
}
}
测试自旋锁
package com.wanshi.lock;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
SpinlockDemo lock = new SpinlockDemo();
new Thread(() -> {
lock.myLock();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
lock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
}, "B").start();
}
}
✂️死锁
死锁:两个线程持有自己的资源尝试去获取其它线程的资源,其它线程未释放,导致阻塞,最终形成僵持,造成死锁
产生死锁的原因主要包括:
- 系统资源不足
- 程序执行的顺序有问题
- 资源分配不当等
怎么排除死锁:
-
互斥条件:一个线程每次只能被一个进程调用
-
不可剥夺条件:进程所获得的资源在未使用完毕之前,不被其他进程强行剥夺,而只能由获得该资源的进程资源释放。
-
请求和保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放
-
循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系
以上给出了导致死锁的四个必要条件,只要系统发生死锁则以上四个条件至少有一个成立。事实上循环等待的成立蕴含了前三个条件的成立,似乎没有必要列出然而考虑这些条件对死锁的预防是有利的,因为可以通过破坏四个条件中的任何一个来预防死锁的发生。
死锁案例
A想拿B,B想拿A,互相僵持着
package com.wanshi.lock;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "A").start();
new Thread(new MyThread(lockB, lockA), "B").start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " lock: " + lockA + " --> get lock" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " lock: " + lockB + " --> get lock" + lockA);
}
}
}
}
解决问题
- 使用
jps -l
查看进程号
- 使用
jstack 进程号
查看堆栈信息
死锁预防,确保系统永远不会进入死锁状态
我们可以通过破坏死锁产生的4个必要条件来 预防死锁,由于资源互斥是资源使用的固有特性是无法改变的。
- 破坏“不可剥夺”条件:一个进程不能获得所需要的全部资源时便处于等待状态,等待期间他占有的资源将被隐式的释放重新加入到 系统的资源列表中,可以被其他的进程使用,而等待的进程只有重新获得自己原有的资源以及新申请的资源才可以重新启动,执行。
- 破坏”请求与保持条件“:第一种方法静态分配即每个进程在开始执行时就申请他所需要的全部资源。第二种是动态分配即每个进程在申请所需要的资源时他本身不占用系统资源。
- 破坏“循环等待”条件:采用资源有序分配其基本思想是将系统中的所有资源顺序编号,将紧缺的,稀少的采用较大的编号,在申请资源时必须按照编号的顺序进行,一个进程只有获得较小编号的进程才能申请较大编号的进程
七、阻塞队列
⛽阻塞队列的定义
阻塞思想:无法得到想要的资源时,线程等待资源,阻塞,等到资源可用时唤醒
阻塞队列
类结构
BlokingQueue 不是新的东西
多线程并发处理,线程池会用到 BlokingQueue
使用队列
JDK提供的阻塞队列
JDK提供了7种阻塞队列。如下:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingQueue:由链表结构组成的双向阻塞队列
♨️常用阻塞队列
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞 等待 | 时等待 |
---|---|---|---|---|
添加 | add | offer() | put | offer(,) |
移除 | remove | pull() | take | pull(,) |
判断队列首 | element | peek | - | - |
ArrayBlockingQueue详细使用:
package com.wanshi.bq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
test2();
}
/**
* 抛出异常
*/
public static void test1() {
//队列的大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
//llegalStateException: Queue full 抛出异常
// System.out.println(arrayBlockingQueue.add("d"));
System.out.println(arrayBlockingQueue.remove());
//获取队列首
System.out.println(arrayBlockingQueue.element());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
// System.out.println(arrayBlockingQueue.remove());
}
public static void test2() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
arrayBlockingQueue.offer("d");
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
//获取队列首
System.out.println(arrayBlockingQueue.peek());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
}
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
// arrayBlockingQueue.put("d");
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
}
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
}
}
SynchronousQueue 同步队列
package com.wanshi.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingDeque = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "put 1");
blockingDeque.put("1");
System.out.println(Thread.currentThread().getName() + "put 2");
blockingDeque.put("2");
System.out.println(Thread.currentThread().getName() + "put 3");
blockingDeque.put("3");
} catch (Exception e) {
}
},"T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingDeque.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingDeque.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingDeque.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
运行效果
⛵小结
以上就是【Bug 终结者】对 【JUC并发编程】 详解锁与队列简单的概述,锁机制很重要,队列是提升我们的性能的很好的工具,锁机制为我们的多线程提供了良好的支持,解决了线程安全问题,锁机制和队列在面试中问到的概率很大,所以说我们应该加强这方面的学习,认真通读,势必拿下锁与队列!
如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
- 点赞
- 收藏
- 关注作者
评论(0)