参考答案
为了应对不同的业务场景,BlockingQueue 提供了4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每组方法的表现是不同的。这些方法如下:
- | 抛异常 | 特定值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() |
四组不同的行为方式含义如下:
参考答案
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。下面以ArrayBlockingQueue为例,来说明BlockingQueue的实现原理。
首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition。ReentrantLock是AbstractQueuedSynchronizer(AQS)的子类,它的newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
扩展阅读
await操作:
signal操作:
signal函数的示意图如下所示。