阻塞队列,主要操作有两个,一个是put放入元素,另一个是take取出元素。所谓的阻塞就是当多个线程同时存取数据时,如果遇到队列为空或者队列为满时,会发生阻塞。并且多个线程同时执行take或者put操作时,某一时刻只有一个线程获得执行权利,也就是执行任何一个操作之前需要获得锁,没有获得锁的线程发生阻塞。
put: 向队列中存入一个元素,如果已满,则阻塞当前线程,等待唤醒。如果正常存入了元素,那么唤醒其他阻塞的线程(有些执行take操作的线程因为队列为空而阻塞)
take: 从队列中取一个元素,如果队列为空,则阻塞当前线程,等待唤醒。如果正常取出了元素,那么唤醒其他阻塞的线程(有些执行put操作的线程因为队列满而阻塞)
Object类提供了几个操作来进行当前线程的唤醒和阻塞。 wait: 阻塞当前线程,其实就是将当前线程放入当前对象的等待集中,释放锁(如果持有锁的话),暂停当前线程。 notify: 唤醒当前对象等待集上的一个线程。 notifyAll: 唤醒当前对象等待集上的所有线程。
基于以上,我们实现一个自己的阻塞队列:
public class MyBlockingQueue1<T> implements MyBlockingQueue<T>{
private Object[] array;
private int count=0;
private int getIndex=0;
private int putIndex=0;
public MyBlockingQueue1(int cap){
array = new Object[cap];
}
public synchronized void put(T ele) throws InterruptedException {
while (isFull()){
wait();
}
array[putIndex++]=ele;
if (putIndex>=array.length){
putIndex=0;
}
count++;
notifyAll();
}
public synchronized T take() throws InterruptedException {
while (isEmpty()){
wait();
}
Object element = array[getIndex++];
if (getIndex>=array.length){
getIndex=0;
}
count--;
notifyAll();
@SuppressWarnings("unchecked")
T t = (T)element;
return t;
}
private boolean isEmpty(){
return count==0;
}
private boolean isFull(){
return count>=array.length;
}
}
put和take方法都加了synchronized,也就是说这两个方法执行之前都需要先取得同一个对象锁,从而,这两个方法就不可以并行执行。于是我们可以稍微优化一下,比如put和take使用两个不同的锁,这两个操作就不会互相影响了。但也会因此使得count成为了临界资源,count++会发生竞态,我们可以考虑使用一个原子变量类来替代int类型。而且上面介绍提到的唤醒部分,每当成功put或者成功take,我们都唤醒所有线程,其实put操作成功时,我们只想唤醒那些因为队列为空而阻塞的线程,take操作成功时,我们只想唤醒那些因为队列已满而阻塞的线程,而且唤醒一个就够了。于是我们可以使用Condition来使得线程在两个不同的等待队列上进行等待,每次都唤醒特定队列上的一个线程。于是0.2版代码如下:
public class MyBlockingQueue2<T> implements MyBlockingQueue<T>{
private Object[] array;
private AtomicInteger count=new AtomicInteger(0);//临界资源,使用原子变量类
private int getIndex=0;
private int putIndex=0;
private ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = putLock.newCondition();//防止过早唤醒
private ReentrantLock takeLock = new ReentrantLock();
private final Condition notFull = takeLock.newCondition();//防止过早唤醒
public MyBlockingQueue2(int cap){
array = new Object[cap];
}
public void put(T ele) throws InterruptedException {
try {
putLock.lock();
while (isFull()){
notFull.await();
}
array[putIndex++]=ele;
if (putIndex>=array.length){
putIndex=0;
}
int c = count.getAndIncrement();
if (c==0){
notEmpty.signal();
}
}finally {
putLock.unlock();
}
}
public T take() throws InterruptedException {
try {
takeLock.lock();
while (isEmpty()){
notEmpty.wait();
}
Object element = array[getIndex++];
if (getIndex>=array.length){
getIndex=0;
}
int c = count.getAndDecrement();
if (c==array.length){
notFull.signal();
}
@SuppressWarnings("unchecked")
T t = (T)element;
return t;
}finally {
takeLock.unlock();
}
}
private boolean isEmpty(){
return count.get()==0;
}
private boolean isFull(){
return count.get()>=array.length;
}
}
我们自己写的这个阻塞队列只是实现了最基本的put和take两个操作,而jdk中的阻塞队列提供的功能更加全面一些。首先,提供了put和take对应的非阻塞方法offer和poll,这两个方法,即使遇到队列为满或为空的情况,也不会阻塞当前线程,而是直接返回false或null。并且还提供了阻塞时间选项,比如,poll时,如果队列为空,可以选择阻塞x秒,如果x秒内还是没能拿到元素,则返回null。其次还提供了比如drainTo、contains、remove等方法来完成一次性取出所有元素,判断元素存在与否,移除一个元素等操作,作为阻塞队列的接口BlockingQueue主要有四个实现类: ArrayBlockingQueue:这个是用数组实现的一个阻塞队列,put和take使用了同一个锁,线程等待队列使用了Condition。 LinkedBlockingQueue:这个是用链表实现的一个阻塞队列,put和take使用了 两个锁,理论上支持更大的并发量。 还有就是PriorityBlockingQueue和SynchronousQueue,一个是优先级阻塞队列,每次都按照优先级来存取元素,另一个是同步队列,其实它内部没有维护队列,而是存入一个元素之后,必须有其他线程将他取走,不然再想put的线程就会被阻塞。这两个队列内部实现跟前两个有所不同,看起来要更复杂一点,比如PriorityBlockingQueue内部是通过堆来维护优先级的,优先级比对我我们可以存入自己的比较器,而SynchronousQueue内部通过Transferer分装了一些操作,这两个队列待独立一篇细说。