前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >阻塞队列 BlockingQueue 我也不会啊

阻塞队列 BlockingQueue 我也不会啊

作者头像
韩旭051
发布2021-04-14 15:16:35
7440
发布2021-04-14 15:16:35
举报
文章被收录于专栏:刷题笔记

阻塞队列BlockingQueue

BlockingQueue中有哪些方法,为什么这样设计?

参考答案

为了应对不同的业务场景,BlockingQueue 提供了4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每组方法的表现是不同的。这些方法如下:

-

抛异常

特定值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除

remove()

poll()

take()

poll(time, unit)

检查

element()

peek()

四组不同的行为方式含义如下:

  • 抛异常:如果操作无法立即执行,则抛一个异常;
  • 特定值:如果操作无法立即执行,则返回一个特定的值(一般是 true / false)。
  • 阻塞:如果操作无法立即执行,则该方法调用将会发生阻塞,直到能够执行;
  • 超时:如果操作无法立即执行,则该方法调用将会发生阻塞,直到能够执行。但等待时间不会超过给定值,并返回一个特定值以告知该操作是否成功(典型的是true / false)。

BlockingQueue是怎么实现的?

参考答案

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。下面以ArrayBlockingQueue为例,来说明BlockingQueue的实现原理。

首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition。ReentrantLock是AbstractQueuedSynchronizer(AQS)的子类,它的newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。

代码语言:javascript
复制
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

代码语言:javascript
复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

扩展阅读

await操作:

  • 我们发现ArrayBlockingQueue并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?Condition对象可以提供和Object的wait和notify一样的行为,但是后者必须先获取synchronized这个内置的monitor锁才能调用,而Condition则必须先获取ReentrantLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。
  • 我们先来看一下Condition的await函数,await函数的流程大致如下图所示。await函数主要有三个步骤,一是调用addConditionWaiter函数,在condition wait queue队列中添加一个节点,代表当前线程在等待一个消息。然后调用fullyRelease函数,将持有的锁释放掉,调用的是AQS的函数。最后一直调用isOnSyncQueue函数判断节点是否被转移到sync queue队列上,也就是AQS中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用acquireQueued函数重新获取锁。

signal操作:

  • signal函数将condition wait queue队列中队首的线程节点转移等待获取锁的sync queue队列中。这样的话,await函数中调用isOnSyncQueue函数就会返回true,导致await函数进入最后一步重新获取锁的状态。
  • 我们这里来详细解析一下condition wait queue和sync queue两个队列的设计原理。condition wait queue是等待消息的队列,因为阻塞队列为空而进入阻塞状态的take函数操作就是在等待阻塞队列不为空的消息。而sync queue队列则是等待获取锁的队列,take函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。
  • signal函数其实就做了一件事情,就是不断尝试调用transferForSignal函数,将condition wait queue队首的一个节点转移到sync queue队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。

signal函数的示意图如下所示。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/03/19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 阻塞队列BlockingQueue
  • BlockingQueue中有哪些方法,为什么这样设计?
  • BlockingQueue是怎么实现的?
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档