源码角度了解阻塞队列之DelayQueue

举报
周杰伦本人 发表于 2022/09/29 23:48:57 2022/09/29
【摘要】 源码角度了解阻塞队列之DelayQueueDelayQueue是延迟队列,在延迟到期的时候才能被获取到,按照到期时间从小到大以二叉堆的形式排列,时间最短的在堆顶的位置,它的成员遍历包括一个Condition条件,一个PriorityQueue队列,一个锁,只允许一个线程生产或消费,还有一个记录leader的线程对象。 put()方法它的put()方法同样也是直接调用了offer()方法,我...

源码角度了解阻塞队列之DelayQueue

DelayQueue是延迟队列,在延迟到期的时候才能被获取到,按照到期时间从小到大以二叉堆的形式排列,时间最短的在堆顶的位置,它的成员遍历包括一个Condition条件,一个PriorityQueue队列,一个锁,只允许一个线程生产或消费,还有一个记录leader的线程对象。

put()方法

它的put()方法同样也是直接调用了offer()方法,我们看一下offer():

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
  1. 先进行加锁
  2. 调用PriorityQueue的offer()方法取元素
  3. 判断放入的元素如果在堆顶,leader设置为空,唤醒其他线程来消费
  4. 返回true
  5. 最后再进行解锁

take()方法

take()方法的整体功能是检索并取出头部的元素,如果有需要的话等待将要到期的元素,下面我们看一下它的take()方法的源码:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
  1. 先进行加锁
  2. for循环,取出堆顶的元素,也就是到期时间最接近的元素
  3. 判断如果第一个元素为空,显然队列为空,就阻塞等待
  4. 否则获取第一个元素的延迟时间,如果这个元素的延迟时间小于等于0,说明已经到期,就可以从队列中取出来了,调用q.poll()进行出栈
  5. 如果延迟时间大于0,说明还没有到期,然后判断leader是否为空,如果不为空的话表示其他线程也在等待这个元素,就一直等待
  6. 如果leader为空,就把当前线程设置给leader变量,同时设置等待时间为延迟队列元素的到期时间,最后到期后如果leader等于当前线程,再设置为空
  7. 最后如果当前线程是leader并且判断还有元素,唤醒其他线程来取元素
  8. 最后进行解锁

总结

这篇文章主要介绍了DelayQueue的put()方法和take()方法,它的成员变量有PriorityQueue,放入元素和获取元素都是调用PriorityQueue实例的方法,它主要按照延迟时间来控制什么时候将元素出队的。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。