java并发编程(五)
30:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue
PriorityBlockingQueue,DelayQueue
阻塞队列:在某些情况下,会挂起线程,一旦条件满足,被挂起的线程会自动唤醒。而阻塞队列无需关心什么时候阻塞,什么时候唤醒。
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界阻塞队列
SynchronousQueue:不存储元素的阻塞队列,即单个元素队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:使用优先级队列实现的延迟无界阻塞队列
阻塞队列的通用Api
抛异常 特殊值 阻塞 超时
插入add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() X X
demo
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}
SynchronousQueue:不存储元素的阻塞队列,即单个元素队列,每次put进一个元素,另一个线程5秒后取值,每次take到的都是put进的值
Connected to the target VM, address: '127.0.0.1:53549', transport: 'socket'
A put 1
B get1
A put 2
B get2
A put 3
B get3
Disconnected from the target VM, address: '127.0.0.1:53549', transport: 'socket'
当队列满时,你还add,就会抛异常,队列满了
当队列空时,你还remove,就会抛异常,没有元素给你
//ArrayBlockingQueue 容量仅为2 放入3个 B能取到吗?能取到几个?
public class BlockingQueueDemo {
public static void main(String[] args) {
//BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.add("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.add("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.add("3");
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}
A put 1
A put 2
A put 3
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at com.kk.thread.BlockingQueueDemo.lambda$main$0(BlockingQueueDemo.java:25)
at java.lang.Thread.run(Thread.java:748)
B get1
B get2
B getnull
问题:那么当ArrayBlockingQueue用put放入元素时会报full queu 异常吗?为什么?
将Add换成put,虽然限制容量为2,但是继续放元素,队列会阻塞直到put数据,B仍能取到所有元素
看下ArraBlockingQueue的put源码
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*在队列尾部插入指定元素,等待队列已满时空间变为可用
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);//检查是否元素为null
final ReentrantLock lock = this.lock;//可重入锁
lock.lockInterruptibly();//实际判断的是Thread.interrupted()
try {
while (count == items.length)队列元素满了就等待
notFull.await();
enqueue(e);否则进入队列,调用singal方法通知线程
} finally {
lock.unlock();
}
}
放入1
检查线程是否中断
设置线程状态
放入队列
此时调用unlock waitstatus=-1
而unlock最终会unpark当前线程
如果线程被阻塞*{@code park}然后它将解除锁定。保证下一个调用
到{@code park}不会阻塞
也就是说put方法在队列满了以后会阻塞,等待没满的唤醒
31:CountDownLatch,CyclicBarrier,Semaphore
CountDownLatch:让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒
public class CountDownLatchDemo
{
public static void main(String[] args) throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 车次离开");
latch.countDown();
}, String.valueOf(i)).start();
}
latch.await();
System.out.println(Thread.currentThread().getName()+"\t 锁定");
}
}
1 车次离开
2 车次离开
3 车次离开
4 车次离开
5 车次离开
6 车次离开
main 锁定
CyclicBarrier:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才
会继续干活。
public class CyclicBarrierDemo
{
private static final int NUMBER = 7;
public static void main(String[] args)
{
//CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cb = new CyclicBarrier(NUMBER, new Runnable() {
@Override
public void run() {
System.out.println("车次"+NUMBER+"已满");
}
});
for (int i = 1; i <=7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 车次到达");
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
1 车次到达
2 车次到达
3 车次到达
4 车次到达
5 车次到达
6 车次到达
7 车次到达
车次7已满
Semaphore:一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制,互斥,当一个线程使用资源时获取信号量,互斥线程需要等待,线程释放,将信号量加1,唤醒互斥资源
public class SemaphoreDemo
{
public static void main(String[] args)
{
//3个停车位
Semaphore semaphore = new Semaphore(3);
//模拟6部汽车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 号停入停车位");
TimeUnit.SECONDS.sleep(3);
semaphore.release();
System.out.println(Thread.currentThread().getName()+"\t 号离开停车位");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
1 号停入停车位
2 号停入停车位
3 号停入停车位
4 号停入停车位
2 号离开停车位
3 号离开停车位
5 号停入停车位
6 号停入停车位
1 号离开停车位
4 号离开停车位
5 号离开停车位
6 号离开停车位
32:Exchanger交换器
一个同步点,在这个点上线程可以成对地交换元素。每个线程在{@link#exchange}方法的条目上显示一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。交换器可以看作{@link SynchronousQueue}的双向形式。Exchangers may be useful in applications such as genetic algorithms and pipeline designs,应用于遗传算法和管道设计
A synchronization point at which threads can pair and swap elements
* within pairs. Each thread presents some object on entry to the
* {@link #exchange exchange} method, matches with a partner thread,
* and receives its partner's object on return. An Exchanger may be
* viewed as a bidirectional form of a {@link SynchronousQueue}.
* Exchangers may be useful in applications such as genetic algorithms
* and pipeline designs.
用于两个线程之间的数据交换
public class ExchangerDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(()->{
String data1="int 1";
exchangeDate(data1, exchanger);
});
executor.execute(()->{
String data1="int 2";
exchangeDate(data1, exchanger);
});
executor.shutdown();
}
private static void exchangeDate(String data1, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "交换数据" + data1 + " 交换");
Thread.sleep(5L);
String data2 = (String) exchanger.exchange(data1);
System.out.println(Thread.currentThread().getName() + "交易得到" + data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-1交换数据int 1 交换
pool-1-thread-2交换数据int 2 交换
pool-1-thread-1交易得到int 2
pool-1-thread-2交易得到int 1
- 点赞
- 收藏
- 关注作者
评论(0)