JUC快速入门各个知识点汇总(中)

举报
长路 发表于 2022/11/23 00:35:37 2022/11/23
1.1k+ 0 0
【摘要】 文章目录前言各类锁汇总相关锁知识点可重入锁与不可重入锁乐观锁与悲观锁自旋锁(含自定义自旋锁)知识补充上下文切换CPU多层缓存架构介绍CPU的三级缓存缓存一致性协议导致的问题一、初识JUC1.1、JUC是什么?1.2、JUC三个包介绍java.util.concurrent包java.util.concurrent.locks包(含两个模板)java.util.concurrent.atomic包1

四、并发容器

4.1、认识CopyOnWrite容器

Copy-On-Write简称COW,是一种用于程序设计中的优化策略。一开始都共享同一个内容,当要进行修改或写操作的时候,会把内容先Copy出去紧接着一个新的内容之后改变原有的共同内容,这是一种延时懒惰策略。

CopyOnWrite容器:即写时复制的容器,当我们往一个容器中添加元素时,并不会直接往当前的容器中添加,而是先将当前容器进行copy一份复制出一份新的容器,接着向新的容器中添加元素,在添加元素之后,再将新的容器引用给原有的容器变量。针对于写会上锁

  • 对于读操作并不会上锁,始终会读到原有容器中的内容。

重点:写操作是针对于新的容器添加,而读操作是针对于旧的容器读。所以CopyOnWrite容器也是一种读写分离的思想,即读和写不同的容器。

JDK1.5开始Java并发包中提供了两个使用CopyOnWrite机制实现的并发容器分别是CopyOnWriteArrayListCopyOnWriteArraySet



4.2、ConcurrentModificationException异常

问题描述

demo见demo3目录下的CopyOnWriteListTest.java

问题描述:多线程情况下使用普通集合来进行读写操作会报出异常,例如下面情况:add()表示写,sout表示读。

public class CopyOnWriteListTest {
    public static void main(String[] args) {
        List<Object> list = new ArrayList<>();
        //多线程下进行add操作
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            }
            //System.out.println(list);
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            }
            //System.out.println(list);
        },"B").start();

    }
}

image-20210326164926824

  • java.util.ConcurrentModificationException:并发修改异常。


问题分析及解决(源码分析)

问题分析过程(通过源码定位)

ArrayList中其方法并不是线程安全的,在多线程情况下进行读写操作会出现ConcurrentModificationException并发修改异常,什么原因导致呢?

  • 我们看其中的报错,源头是println() -> valueOf() -> AbstractCollection.toString() -> Itr.next() -> Itr.checkForComodification
//ArraryList类中的Itr类里的方法
final void checkForComodification() {
    if (modCount != expectedModCount)
        //若是不相等,则会抛出并发修改异常!!!
        throw new ConcurrentModificationException();
}
  • 原因是modCountexpectedModCount不相等抛出的异常。那么我们此时有个问题modCountexpectedModCount指的是什么?

我们println()输出list集合实际上是遍历集合中的迭代器(Itr,实现Iterable接口),将每个元素通过StringBuilder()合并起来(关键是其中的遍历迭代器操作)。

首先看ArrayList中的迭代器Itr

public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E> {
	protected transient int modCount = 0;//表示的是修改的次数()
}

public class ArrayList<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable
    //ArrayList中的迭代器
	private class Itr implements Iterator<E> {
        int cursor;        //下一个要访问元素的索引
        int lastRet = -1;  //上一个访问元素的索引
        int expectedModCount = modCount;//期望修改的值(即使用迭代器时修改的值)
        
        //next()方法获取下一个值
        public E next() {
            //进行检查操作(重点,本部分出现情况)
            checkForComodification();
            int i = cursor;
            if (i >= size)
                throw new NoSuchElementException();
            Object[] elementData = ArrayList.this.elementData;
     //若是下一个要访问的索引>=数组中的长度(表示获取不到了,针对于多线程情况下其他线程删除操作导致原本数组长度变小抛出异常)
            if (i >= elementData.length)
                throw new ConcurrentModificationException();
            //索引+1
            cursor = i + 1;
            //返回对应索引下标的元素
            return (E) elementData[lastRet = i];
        }
        
        //检查方法
        final void checkForComodification() {
            //若是迭代器中修改的值与集合中修改的值不一致时说明同时进行了读写操作
            //报出并发修改异常
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }
    }

    //ArrayList的写操作add()方法
	public boolean add(E e) {
        //其中我们跳过中间调用方法直接看46行的方法(指定修改次数+1)
        ensureCapacityInternal(size + 1);  // Increments modCount!! 增加修改次数(modCount)
        elementData[size++] = e;
        return true;
    }

	//ArrayList中的确保方法
    private void ensureExplicitCapacity(int minCapacity) {
        modCount++;//集合修改次数+1
        if (minCapacity - elementData.length > 0)
            grow(minCapacity);
    }
}

通过源码分析快速简洁说明:在sout输出集合过程中实际上遍历的是迭代器。迭代器在执行next()方法获取值时(相当于读操作)会首先进行判断modCount != expectedModCount

  • modCount指的是针对于集合中的修改次数(如add()remove(),执行时会modCount++,并不直接对于迭代器)。
  • expectedModCount指的是在迭代器中的修改次数(如remove(),执行时会expectedModCount=modCount)。

在上面这种情况两个线程分别包含读(迭代器next())、写(集合的add())操作,若是线程A在对迭代器中执行next()中的检查方法的同时,其他线程B对于集合add()方法执行多次,其modCount也就会多次+1,该过程中expectedModCount值并不会变,自然轮到线程A进行检查方法判断则会出现false情况了,就会报出异常(通过简单的调用次数判断就能够来发现你是否出现了并发修改异常真的是秒啊)。

源码设计者通过这种方式能够阻止在多线程并发时读写出现的问题来抛出异常,不得不配合其设计能力!!!

注意:其实并不仅仅针对于checkForComodification()会抛出异常,还有上面21行判断读取的值是否>=数组的长度也能够检测出是否出现并发修改异常!


列举出现该异常情况

1、单线程情况下,普通集合迭代器遍历时删除时不使用Itrremove()方法,而使用集合中的remove方法删除同样出现该异常情况。

2、多线程情况下,普通集合进行读(迭代器遍历,如调用toString()方法)、写(调用集合中方法)会抛出异常。

3、多线程情况下,使用Vector进行迭代器遍历、写操作时,也会抛出该异常。

  • 尽管Vector中的方法都采用了Synchronized进行了同步,若多个线程都对一个集合进行读(迭代器)、写操作会抛出该异常,因为就算是每个方法同步了,例如线程A在执行遍历迭代器next()方法结束后释放锁,线程B会执行add()方法,执行完后(modcount++)也释放锁,接着线程A再次遍历时,该迭代器中的expectedModCountmodCount会出现不等,则又会抛出异常!

  • //下面情况尽管使用Vector也会出现并发修改异常
    public class Test {
        //其有一个共同操纵属性modCount
        static List<Integer> list = new Vector<Integer>();
        public static void main(String[] args)  {
            list.add(1);
            list.add(2);
            list.add(3);
            list.add(4);
            list.add(5);
            Thread thread1 = new Thread(){
                public void run() {
                    Iterator<Integer> iterator = list.iterator();
                    while(iterator.hasNext()){
                        Integer integer = iterator.next();
                        System.out.println(integer);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
            };
            Thread thread2 = new Thread(){
                public void run() {
                    Iterator<Integer> iterator = list.iterator();
                    while(iterator.hasNext()){
                        Integer integer = iterator.next();
                        if(integer==2)
                            iterator.remove();
                    }
                };
            };
            thread1.start();
            thread2.start();
        }
    }
    

解决方案

解决方案:①在使用iterator迭代器时候使用synchronizedlock进行同步。②使用并发容器CopyOnWriteArrayList来代替ArrayListVector

若是Vector直接输出list集合则是不会出现线程安全问题的,直接输出的话实际上调用的是其toString()方法(线程安全的),其中包裹着遍历迭代器。



4.3、读写集合类及并发类

下面列举的三个类都是线程安全的,其都具有各自的特点!


①CopyOnWriteArrayList

image-20210330135158338

源码分析

前面也介绍了这类容器是读写分离的,读的是旧容器,写的是新容器(写完之后重新赋值引用)。

我们看下其源码中究竟是怎么做的?

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    //默认是使用的可重入锁(非公平锁)
    final transient ReentrantLock lock = new ReentrantLock();
    private transient volatile Object[] array;
    
    final void setArray(Object[] a) {
        array = a;
    }

    //写操作
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;//首先获取到可重入锁
        //上锁
        lock.lock();
        try {
            Object[] elements = getArray();//获取到原有的对象数组
            int len = elements.length;
            //复制一份新的对象数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //注意!!!写操作实际上是在新的对象数组中完成的,并且本部分内容是具有原子性的
            newElements[len] = e;
            //新的对象数组赋值完之后将新的容器引用给原来的变量名
            setArray(newElements);
            return true;//返回true表示添加成功
        } finally {
            lock.unlock();
        }
    }
    
    //读操作
    public E get(int index) {
        return get(getArray(), index);//调用36行方法
    }
    
    private E get(Object[] a, int index) {
        //直接返回读到的内容,并没有上锁或者同步方法(因为使用是对旧的容器读不会与之前的写操作造成冲突)
        return (E) a[index];
    }
}

demo见demo3目录下的CopyOnWriteListTest.java

程序描述:用于测试CopyOnWriteList是否在多线程下有线程安全问题。

/**
 * @ClassName CopyOnWriteList
 * @Author ChangLu
 * @Date 2021/3/26 14:10
 * @Description CopyOnWriteList:测试读写分离
 */
public class CopyOnWriteListTest {
    public static void main(String[] args) {
        //问题描述:List<Object> list = new ArrayList<>(); 线程不安全的
        //解决1:Collections.synchronizedList(new ArrayList<>()) 线程安全的(实际就是包裹了一层synchronized)
        //解决2:new Vector<>() 线程安全的(jdk1.0就出现了,其所有方法都是同步的)
        //解决3:使用CopyOnWriteArrayList(读写分离)
        List<Object> list = new CopyOnWriteArrayList<>();
        //多线程下进行add操作
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            }
            //System.out.println(list);
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            }
            //System.out.println(list);
        },"B").start();

    }
}

image-20210328161124121

  • 测试结果是正确的,并没有在读写操作过程中发生异常。

注意点(建议)

  1. 减少扩容开销,根据实际需要初始化CopyOnWriteMap的大小,避免多次写入时进行扩容操作。
  2. 尽量使用批量添加方法addAll(),防止每次添加时容器都会进行复制一份,减少内存开销。

优点:读写分离,提升性能。

缺点:内存占用问题与数据一致性问题。

  • 内存占用问题:每次添加一个元素时都会复制一份新的数组。
  • 数据一致性问题:不要期望每次读取数据时都能够读到最新添加进入的数据。有时可能会读到旧的容器中数据。

改进

  1. 针对于内存占用问题,可以通过压缩容器中的元素来减少对象的内存,例如10进制的数字可以压缩为更大的进制表示,如16位、32位、64位。若不使用该容器可使用其他容器如ConcurrentHashMap
  2. 若是你希望写入的数据实时被读取出来建议不要使用该并发容器。


②CopyOnWriteArraySet

image-20210330135233847

demo见demo3包下的CopyOnWriteSetTest.java

程序描述:解决普通Set集合框架在多线程下出现的安全问题,可以使用CollectionssynchronizedSet()来包装或者使用CopyOnWriteArraySet(读写分离):

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @ClassName CopyOnWriteSetTest
 * @Author ChangLu
 * @Date 2021/3/28 21:50
 * @Description CopyOnWriteSet(读写分离Set并解决线程安全问题)
 */
public class CopyOnWriteSetTest {
    //private static Set<String> set = new HashSet<>();:多线程下有线程安全问题
    //1、Collections.synchronizedSet(new HashSet<>()):使用工具类来包装一下,线程安全
    //2、new CopyOnWriteArraySet():使用并发包中的cow读写分离的set,线程安全的!
    private static Set<String> set = new CopyOnWriteArraySet();

    public static void main(String[] args) {
        //开辟20个线程
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 20; j++) {
                    set.add(UUID.randomUUID().toString());
                }
                System.out.println(set);
            }).start();
        }

    }
}

image-20210328220555918



③ConcurrentHashMap

特点描述:将数组的每一个表头锁住,在并发情况下若是获取了不同的表头,则可以进行同步执行。当一个线程占用锁访问其中的一个段数据时,其他段的数据也能够被其他线程访问,实现并发访问!

image-20210330135318238

Demo见demo3中的ConcurrentHashMapTest

程序描述:使用普通集合框架如HashMap在多线程下会出现安全问题,解决安全问题可以使用HashTableCollections.synchronizedMap(new HashMap<>()),对于这两个都是直接使用的synchronized关键字来保证方法的同步,效率比较低下,之后JDK1.5出现并发包引出了ConcurrentHashMap其提高了并发内容,在内部采用了segment的结构(类似于一个类Hash Table的结构,内部维护一个链表数组)。

/**
 * @ClassName ConcurrentHashMapTest
 * @Author ChangLu
 * @Date 2021/3/28 22:21
 * @Description ConcurrentHashMap解决Map的安全问题
 */
public class ConcurrentHashMapTest {
    //Map<String,String> map = new HashMap<>():使用普通的集合在多线程下会有线程安全问题(ConcurrentModificationException)
    //1、Collections.synchronizedMap(new HashMap<>()):线程安全
    //2、new ConcurrentHashMap<>():线程安全(并发包中引出的)
    private static Map<String,String> map = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        //创建10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                //每个线程存储10个
                for (int j = 0; j < 20; j++) {
                    map.put(Thread.currentThread().getName(), UUID.randomUUID().toString());
                    System.out.println(map);
                }
            }).start();
        }
    }
}

image-20210328223827184



五、Callable使用

介绍Callable

Callable接口属于java.util.concurrent包,其有①返回值。②可以抛出异常。③支持泛型自定义返回值类型。

Thread调用start()方法如何执行Callable中的call()方法

  • 由于Callable接口是使用的call()方法并包含返回值,所以需要使用一个可以说适配器吧FutureTask,该FutureTaskrun()方法内部就是调用的call()方法。

应用场景

  1. 执行多任务计算,创建一个FutureTaskList集合。
  2. 在高并发环境下确保任务只执行一次。

demo见demo3目录下的CallableTest.java

/**
 * @ClassName CallableTest
 * @Author ChangLu
 * @Date 2021/3/28 23:25
 * @Description 测试使用Callable
 */
public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<String>(new MyCallable());
        for (int i = 0; i < 10; i++) {
            new Thread(task).start();
        }

        //获取返回值
        String s = task.get();//会有阻塞
        System.out.println(s);

    }
}

class MyCallable implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("执行MyCallable()中的call()方法");
        return "调用成功!";
    }
}

image-20210328234408793

细节部分

  1. FutureTask对于call()只会执行一次。
  2. 通过使用get()方法获取返回值需要等待,有阻塞。

源码分析

①为什么一个FutureTask只执行一次call()方法?

FutureTask中使用state来保存任务的状态,初始构造器设置state状态为NEW

private static final int NEW          = 0;//任务尚未开始或处于执行期间
private static final int COMPLETING   = 1;//任务即将执行完成
private static final int NORMAL       = 2;//任务执行完毕
private static final int EXCEPTIONAL  = 3;//任务执行期间出现未捕获异常
private static final int CANCELLED    = 4;//任务被取消
private static final int INTERRUPTING = 5;//任务正在被中断
private static final int INTERRUPTED  = 6;//任务已被中断

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

run()方法中一旦执行call()方法就会改变state状态

public void run() {
    //会判断state是否为NEW,若不为null,则直接退出该方法
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行其中的Callable中的call()方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                //该方法来设置返回值
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        //将state改为NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

说明:我们可以看到当执行之后,将state更改为NORMAL,所以我们使用多线程时直接执行一次call()方法。

②为什么get()方法有阻塞

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务没有执行完,则会进行等待执行操作
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

说明:可以看到若是状态是没有结束的情况,需要阻塞等待得到结果。



六、同步工具辅助类使用

CountDownLatch(倒计时器)

image-20210330135407268

认识CountDownLatch—减法计数器

CountDownLatch是通过一个计数器来实现的,计数的初始值一般为你线程的数量,当调用该CountDownLatch实例的await()方法的线程会进入阻塞状态,直到计数器减到0的时候,才能继续往下执行。

主要方法

  • CountDownLatch(int count):构造器,指定计数的数量。
  • CountDownLatch(int count):计数-1,一般放置在线程中。
  • void await():调用该方法的线程会进行阻塞,直到计数器为0时才会继续往下执行。
  • boolean await(long timeout, TimeUnit unit):阻塞指定的时长。

join方法比较:可以发现该工具类使用与线程的join方法很像都是等待线程完成之后执行

  1. 相对于join()方法更加灵活。
  2. 可以手动控制在n个或单个线程中使计数器进行减一操作。
  3. join()实现原理是不停的检查join线程是否存活,若是join线程存活则会当前线程永远等待,而CountDownLatch可以设置等待时长。

应用场景:启动一个服务时,主线程需要等待多个组件加载完毕之后继续执行。


实操使用

两个demo见demo4目录中的CountDownLatchTest.javaCountDownLatchTest2.java

①线程完成任务的指定数量之后,才去执行的事如进行结果汇总

/**
 * @ClassName CountDownLatchTest
 * @Author ChangLu
 * @Date 2021/3/29 16:39
 * @Description CountDownLatch:同步工具类(完成指定数量任务之后一定要做的事)
 */
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"执行了");
                latch.countDown();//计数器-1
            },String.valueOf(i)).start();
        }

        latch.await();//主线程进入阻塞状态等待锁存器计数到0,才继续往下执行
        System.out.println("close door!");
    }
}

image-20210329170340883

  • 可以看到可以将计数器-1操作放置到线程中去,指定数量的线程完成任务之后再执行结果汇总操作。

②秒杀场景,如模拟高并发,让一组线程在指定时刻(秒杀时间)来进行抢购。

依旧可以通过使用CountDownLatch来去实现,只不过这里阻塞等待放置在线程中:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName CountDownLatchTest2
 * @Author ChangLu
 * @Date 2021/3/29 17:16
 * @Description CountDownLatch使用2:秒杀场景(倒计时开抢)
 */
public class CountDownLatchTest2 {
    public static void main(String[] args) throws InterruptedException {
        //初始计数为1
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    latch.await();//进行等待操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"开始执行!");
            }).start();
        }

        //模拟倒计时2秒
        for (int i = 3; i >=1; i--) {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(i);
        }
        System.out.println("开抢!!!");
        //计数-1指挥线程开始行动
        latch.countDown();
    }

image-20210329172136586

注意点

  1. 若是计数器始终没有到0会进入阻塞状态,可以使用await()的带参方法。


CyclicBarrier(循环珊栏)

image-20210330135429666

认识CyclicBarrier

CyclicBarrier(循环屏障):通过设置一个珊栏可以拦截指定数量的线程,对应线程会进入阻塞,拦截到指定数量之后会先去执行CyclicBarrier中的run()方法之后再将珊栏中的线程全部唤醒。

  • 做的事情就是,让一组线程到达一个屏障(同步点)时被阻塞,知道指定数量的最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行对应任务。

方法

  • CyclicBarrier(int parties, Runnable barrierAction):有参构造器,设置循环拦截的指定数量以及添加一个runnable()方法。
  • int await():计数器会-1,没有满指定数量的线程会进入阻塞状态,一旦计数器为0时先执行指定的runnable中的方法,接着唤醒其他数量的线程。

image-20210329182146494

  • 珊栏会阻塞指定数量的线程,当线程数量达到时会先执行其指定的任务,接着唤醒其他所有的线程。

实操:demo见demo4目录中的CyclicBarrierTest.java

下面是对CyclicBarrier工具类的测试使用,将每个线程当做为执行一个任务,执行await()时线程会进入阻塞,当达到珊栏中的指定数量线程之后会先执行珊栏中的任务接着唤醒其他线程,若是还有其他线程的话就会继续下一轮珊栏:

/**
 * @ClassName CyclicBarrierTest
 * @Author ChangLu
 * @Date 2021/3/29 17:51
 * @Description CyclicBarrier(循环屏障)使用
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        //创建一个循环屏障,设置为7
        CyclicBarrier barrier = new CyclicBarrier(7, () -> {
            System.out.println("集齐七颗龙珠,终极进化!");
        });

        for (int i = 1; i < 8; i++) {
            //i属于局部变量存在于栈,在下面Lambda表达式中若想要使用该变量应该将其存放于一个final变量中
            //闭包,jdk1.8可以不写final,jvm会默认加上去的
            int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"集齐了第"+temp+"龙珠");
                try {
                    //等待有7个线程执行这个等待方法时一起通过
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"结束");
            }).start();
        }
    }
}

image-20210329182313922

注意点:若是指定线程数量不是珊栏设置的个数倍数,那么其他的线程很有可能会一直在阻塞状态。



semaphore(信号量)

image-20210330135509084

认识semaphore

semaphore(信号量,即操作系统中的pv操作):该类可以使同一段时间内指定数量得到线程。需要通过该信号量进行获取与释放。

  • 可以控制同时访问资源的线程个数,如实现一个文件允许的并发数量。

主要方法

  • Semaphore(int permits):有参构造,设置许可证的数量。
  • void acquire():获得的意思,从信号量中获取一个许可证,若是许可证满了该线程进入阻塞直到其他线程释放许可证为止。
  • void release():释放一个许可(+1)将其返回给信号量,之后唤醒等待的线程。
  • int availablePermits():返回此信号量中当前可用的许可数。
  • boolean hasQueuedThreads():查询是否有线程正在等待获取。

应用场景:限流(SpringCloud)。


示例:demo见demo4目录中的SemaphoreTest.java

(1)、假设信号量为停车场,提供三个停车位(达到限制目的),之后模拟多辆车来占用释放停车位:

/**
 * @ClassName SemaphoreTest
 * @Author ChangLu
 * @Date 2021/3/29 18:52
 * @Description Semaphore(信号量)简单使用
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//通过有参构造器创建指定数量的许可证
        //创建8个线程(相当于8辆车去占3个车位)
        for (int i = 0; i < 8; i++) {
            new Thread(()->{
                try {
                    //首先需要发布
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了停车位");
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + "归还了停车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //释放掉许可证
                semaphore.release();
            }).start();
        }
    }
}

image-20210329191615869

注意:若是某个线程来获取许可证,许可证始终为满时,该线程可能会一直在阻塞状态。


(2)、可通过使用tryAcquire()来进行尝试获取许可证,若是没有获取到可做其他事情,修改下上面的例子

Semaphore semaphore = new Semaphore(3);//通过有参构造器创建指定数量的许可证
for (int i = 0; i < 8; i++) {
    new Thread(()->{
        //尝试获取许可证
        if(semaphore.tryAcquire(){
            System.out.println(Thread.currentThread().getName() + "抢到了停车位");
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "归还了停车位");
            //释放掉许可证
            semaphore.release();
        }else{
            //若是没有获取到许可证则做其他事情...
        }
   }).start();
}


总结

1、CountDownLatch可以看作一个减法计数器,可指定线程执行的数量,若是使用countDown()方法计数器就会减一,使用await()方法的线程会进入阻塞等待直到计数器为0时执行下面的操作。应用场景如:需完成指定任务执行重要的结果汇总、秒杀、主线程需要等待几个组件完成之后再执行。

2、CyclicBarrier珊栏屏障,每个屏障拦截指定数量的线程(通过await()方法),被拦截下来的线程会进入阻塞状态,一旦达到屏障中达到了指定数量的线程就会先执行屏障自带的方法接着释放其他阻塞线程,循环执行。应用场景:循环指定个数的线程执行并进行总的执行方法。

3、semaphore信号量,可设置指定数量的许可证,同一时间内只有指定数量的线程能够获取到许可证,其他线程若想要获取许可证的话需要其他线程先释放许可证之后才能执行。应用场景:限流。



七、读写锁(ReentrantReadWriteLock)

7.1、初识ReentrantReadWriteLock

image-20210330135618497

介绍:ReentrantReadWriteLock除了提供读锁、写锁及其各自释放与获取外,还提供了一些其他和锁状态有关的方法

一般在读多写少的场景应用:

  • 读操作(共享锁):同一时间允许多个线程对同一共享资源进行读操作。同一时刻所有线程的写操作会被阻塞。
  • 写操作(排他锁):同一时间允许一个线程对同一共享资源进行写操作。同一时刻其他线程的读操作会被阻塞。

image-20210330140140751

  • 获取读锁(上锁最大数量为2^16^-1),写锁(上锁最大数量为2^16^-1))之后,同样使用lock()unlock()来上锁与释放锁。
//无参、有参构造器。默认使用的是非公平锁
public ReentrantReadWriteLock() {
	this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}


7.2、程序示例

示例:demo见demo4中的ReadWriteLockTest

使用不同线程来进行读写操作:

/**
 * @ClassName ReadWriteLockTest
 * @Author ChangLu
 * @Date 2021/3/30 11:23
 * @Description 读写锁ReadWriteLock测试
 */
public class ReadWriteLockTest {
    public static void main(String[] args) {
        WRdata2 wRdata = new WRdata2();
        for (int i = 1; i <= 3; i++) {
            //闭包获取外部变量需要设置为final,对于这里int temp=i,jdk1.8默认会加上final
            int temp = i;
            new Thread(()->{
                for (int j=(temp-1)*10;j<=(temp-1)*10+10;j++){
                    //进行写操作
                    wRdata.write(j);
                }
            }).start();
        }

        new Thread(()->{
            for (int i = 1; i <= 30; i++) {
                wRdata.read(i);
            }
        }).start();

    }
}

//使用读写锁
class WRdata2{
    //volatile保证可见性与防止指令重排
    private volatile Map<String,String> map = new HashMap<>();
    //创建一个读写锁实例(默认是非公平锁)
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    //分别获取读锁、写锁
    private Lock readLock = lock.readLock();
    private Lock writeLock = lock.writeLock();

    //写操作
    public void write(int i){
        writeLock.lock();//上锁(写锁)
        try {
            System.out.println(Thread.currentThread().getName()+"执行写入操作,写入"+i);
            map.put(String.valueOf(i),Thread.currentThread().getName()+"执行"+i);
            System.out.println(Thread.currentThread().getName()+"写入"+i+"的操作完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();//释放锁(写锁)
        }
    }

    //读操作
    public void read(int i){
        readLock.lock();//上锁(写锁)
        try {
            String o = map.get(String.valueOf(i));
            System.out.println(Thread.currentThread().getName()+"读取到key="+i+"为"+o);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();//释放锁(读锁)
        }
    }

}

image-20210330142002720

注意:读锁与写锁上锁的数量不应超过2^16^-1(65535)。



八、阻塞队列

8.1、认识阻塞队列

阻塞队列都是使用的BlockingQueue接口:实现了Queue接口,也属于Collection容器

  • 阻塞思想:无法得到需要的资源时,线程等待资源进行阻塞,等到资源可用时唤醒。

image-20210330150423263

一般在多线程并发处理,线程池使用阻塞队列。



8.2、两个阻塞队列实现类

①ArrayBlockingQueue

介绍:数组阻塞队列,其中的方法内部都是上锁的,并发下是安全的。

image-20210330150223685

其中包含了四套API

  1. 抛出异常
  2. 不会抛出异常
  3. 阻塞等待
  4. 超时等待
方式 ①抛出异常 ②不抛出异常 ③阻塞等待 ④超时等待
添加操作 add(E e) offer(E e) put(E e) offer(E e, long timeout, TimeUnit unit)
移除操作 remove() poll() take() poll(long timeout, TimeUnit unit)
检测队首元素 element() peek()
  • 添加操作中②④都有返回值true或false,①是返回true或抛异常IllegalStateException
  • 移除操作中②③④都有返回值元素或null,①是返回元素或抛异常NoSuchElementException
  • 检查队首元素①返回元素或抛异常NoSuchElementException,②是返回元素或null

说明:对于①实际上内部实际上就是使用的offer()方法,其余的方法内部都是上锁的,是线程安全的。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。