源码角度了解阻塞队列之SynchronousQueue
【摘要】 源码角度了解阻塞队列之SynchronousQueueSynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储 put()方法public void put(E e) throws InterruptedException { ...
源码角度了解阻塞队列之SynchronousQueue
SynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作
从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储
put()方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
我们可以看到它的put()方法没有什么逻辑,主要调用Transfererd的transfer()方法来放入元素
transfer()方法
Transfererd是一个抽象类,它的实现类有TransferQueue和TransferStack,分别是公平队列的实现和非公平队列的实现
我们先看一下TransferQueue的transfer()方法的实现逻辑
TransferQueue的transfer()方法
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
代码有点长,我们分析一下:
- isData代表是生产者模式还是消费者模式,如果传入的元素e是空的话显然是消费者模式
- for循环,初始化QNode的头结点和尾结点,如果为空的话说明没有初始化好,继续for循环进行初始化
- 如果头尾节点相同或者是尾部节点模式和传入的模式相同的话,添加到尾部
- 新建节点,添加到尾部,然后调用awaitFulfill()方法进入阻塞状态
- 当线程唤醒发现在等待队列中第一个的时候,返回元素
- 如果模式不相同的话,比如来的是消费者,而等待队列全是生产者,这时候与队列中的第一个元素进行配的,配对成功出队列,同时调用LockSupport.unpark(m.waiter)来唤醒生产者
TransferStack的transfer()方法
- 同样判断是否为同种模式,如果是同种模式,入栈 阻塞
- 如果不是同种模式,进行配对一起出栈
take()方法
SynchronousQueue的take()方法也是调用transferer的transfer()方法来获取元素,然后返回元素
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
总结
SynchronousQueue有公平队列和非公平队列两种实现方式,公平队列采用队列实现先进先出,非公平队列采用栈实现,先进后出,它的最大特点就是生产者来了需要阻塞,当对应的消费者来之后匹配成功才会被唤醒,这个SynchronousQueue产生阻塞,效率不高,一般不怎么使用
至此,我们的阻塞队列的实现基本介绍了,有ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue、DelayQueue和SynchronousQueue
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)