java并发编程(五)

举报
赵KK日常技术记录 发表于 2023/06/24 12:54:24 2023/06/24
【摘要】 30:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueuePriorityBlockingQueue,DelayQueue阻塞队列:在某些情况下,会挂起线程,一旦条件满足,被挂起的线程会自动唤醒。而阻塞队列无需关心什么时候阻塞,什么时候唤醒。ArrayBlockingQueue:由数组结构组成的有界阻塞队列LinkedBlocking...

30:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue

PriorityBlockingQueue,DelayQueue

阻塞队列:在某些情况下,会挂起线程,一旦条件满足,被挂起的线程会自动唤醒。而阻塞队列无需关心什么时候阻塞,什么时候唤醒。

请在此添加图片描述

ArrayBlockingQueue:由数组结构组成的有界阻塞队列

LinkedBlockingQueue:由链表结构组成的有界阻塞队列

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列

PriorityBlockingQueue:支持优先级排序的无界阻塞队列

DelayQueue:使用优先级队列实现的延迟无界阻塞队列

阻塞队列的通用Api

  抛异常       特殊值     阻塞        超时

插入add(e) offer(e) put(e) offer(e,time,unit)

移除 remove() poll() take() poll(time,unit)

检查 element() peek() X X

demo

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }


}

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列,每次put进一个元素,另一个线程5秒后取值,每次take到的都是put进的值

Connected to the target VM, address: '127.0.0.1:53549', transport: 'socket'
A   put 1
B   get1
A   put 2
B   get2
A   put 3
B   get3
Disconnected from the target VM, address: '127.0.0.1:53549', transport: 'socket'

当队列满时,你还add,就会抛异常,队列满了

当队列空时,你还remove,就会抛异常,没有元素给你

//ArrayBlockingQueue 容量仅为2 放入3个 B能取到吗?能取到几个?

public class BlockingQueueDemo {

    public static void main(String[] args) {
        //BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
      
        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
        new Thread(() -> {

            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.add("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.add("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.add("3");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }
}
A   put 1
A   put 2
A   put 3
java.lang.IllegalStateException: Queue full
  at java.util.AbstractQueue.add(AbstractQueue.java:98)
  at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
  at com.kk.thread.BlockingQueueDemo.lambda$main$0(BlockingQueueDemo.java:25)
  at java.lang.Thread.run(Thread.java:748)
B   get1
B   get2
B   getnull

问题:那么当ArrayBlockingQueue用put放入元素时会报full queu 异常吗?为什么?

将Add换成put,虽然限制容量为2,但是继续放元素,队列会阻塞直到put数据,B仍能取到所有元素

看下ArraBlockingQueue的put源码

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *在队列尾部插入指定元素,等待队列已满时空间变为可用
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);//检查是否元素为null
        final ReentrantLock lock = this.lock;//可重入锁
        lock.lockInterruptibly();//实际判断的是Thread.interrupted()
        try {
            while (count == items.length)队列元素满了就等待
                notFull.await();
            enqueue(e);否则进入队列,调用singal方法通知线程
        } finally {
            lock.unlock();
        }
    }

放入1

请在此添加图片描述

检查线程是否中断

请在此添加图片描述

请在此添加图片描述

设置线程状态

请在此添加图片描述

放入队列

请在此添加图片描述

此时调用unlock waitstatus=-1

请在此添加图片描述

而unlock最终会unpark当前线程

请在此添加图片描述

如果线程被阻塞*{@code park}然后它将解除锁定。保证下一个调用

到{@code park}不会阻塞

也就是说put方法在队列满了以后会阻塞,等待没满的唤醒

31:CountDownLatch,CyclicBarrier,Semaphore

CountDownLatch:让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒

public class CountDownLatchDemo
{
  public static void main(String[] args) throws InterruptedException
  {
    
    CountDownLatch latch = new CountDownLatch(6);
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        System.out.println(Thread.currentThread().getName()+"\t 车次离开");
        latch.countDown();
      }, String.valueOf(i)).start();
    }
    latch.await();
    System.out.println(Thread.currentThread().getName()+"\t 锁定");
  }

}
1   车次离开
2   车次离开
3   车次离开
4   车次离开
5   车次离开
6   车次离开
main   锁定

CyclicBarrier:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才

会继续干活。

public class CyclicBarrierDemo
{
  private static final int NUMBER = 7;
  
  public static void main(String[] args)
  {
    //CyclicBarrier(int parties, Runnable barrierAction) 
    CyclicBarrier cb = new CyclicBarrier(NUMBER, new Runnable() {
      @Override
      public void run() {
        System.out.println("车次"+NUMBER+"已满");
      }
    });
    
    for (int i = 1; i <=7; i++) {
      new Thread(() -> {
        try {
          System.out.println(Thread.currentThread().getName()+"\t 车次到达");
          cb.await();
        } catch (InterruptedException | BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }, String.valueOf(i)).start();
    }
  }
1   车次到达
2   车次到达
3   车次到达
4   车次到达
5   车次到达
6   车次到达
7   车次到达
车次7已满

Semaphore:一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制,互斥,当一个线程使用资源时获取信号量,互斥线程需要等待,线程释放,将信号量加1,唤醒互斥资源

public class SemaphoreDemo
{
  public static void main(String[] args)
  {
    //3个停车位
    Semaphore semaphore = new Semaphore(3);
    //模拟6部汽车
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        
        try {
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName()+"\t 号停入停车位");
          TimeUnit.SECONDS.sleep(3);
          semaphore.release();
          System.out.println(Thread.currentThread().getName()+"\t 号离开停车位");
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }

      }, String.valueOf(i)).start();
    }
    
  }
}

1 号停入停车位

2 号停入停车位

3 号停入停车位

4 号停入停车位

2 号离开停车位

3 号离开停车位

5 号停入停车位

6 号停入停车位

1 号离开停车位

4 号离开停车位

5 号离开停车位

6 号离开停车位

32:Exchanger交换器

一个同步点,在这个点上线程可以成对地交换元素。每个线程在{@link#exchange}方法的条目上显示一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。交换器可以看作{@link SynchronousQueue}的双向形式。Exchangers may be useful in applications such as genetic algorithms and pipeline designs,应用于遗传算法和管道设计

A synchronization point at which threads can pair and swap elements
 * within pairs.  Each thread presents some object on entry to the
 * {@link #exchange exchange} method, matches with a partner thread,
 * and receives its partner's object on return.  An Exchanger may be
 * viewed as a bidirectional form of a {@link SynchronousQueue}.
 * Exchangers may be useful in applications such as genetic algorithms
 * and pipeline designs.

用于两个线程之间的数据交换

public class ExchangerDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        final Exchanger exchanger = new Exchanger();
        executor.execute(()->{
            String data1="int 1";
            exchangeDate(data1, exchanger);
        });
        executor.execute(()->{
            String data1="int 2";
            exchangeDate(data1, exchanger);
        });
        executor.shutdown();
    }

    private static void exchangeDate(String data1, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + "交换数据" + data1 + " 交换");
            Thread.sleep(5L);
            String data2 = (String) exchanger.exchange(data1);
            System.out.println(Thread.currentThread().getName() + "交易得到" + data2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
pool-1-thread-1交换数据int 1 交换
pool-1-thread-2交换数据int 2 交换
pool-1-thread-1交易得到int 2
pool-1-thread-2交易得到int 1
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。