前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >阻塞队列中的线程协作(阻塞、唤醒、锁)

阻塞队列中的线程协作(阻塞、唤醒、锁)

作者头像
naget
发布2019-07-30 11:24:02
1.2K0
发布2019-07-30 11:24:02
举报
文章被收录于专栏:Vegout

自己写一个阻塞队列

阻塞队列,主要操作有两个,一个是put放入元素,另一个是take取出元素。所谓的阻塞就是当多个线程同时存取数据时,如果遇到队列为空或者队列为满时,会发生阻塞。并且多个线程同时执行take或者put操作时,某一时刻只有一个线程获得执行权利,也就是执行任何一个操作之前需要获得锁,没有获得锁的线程发生阻塞。

put: 向队列中存入一个元素,如果已满,则阻塞当前线程,等待唤醒。如果正常存入了元素,那么唤醒其他阻塞的线程(有些执行take操作的线程因为队列为空而阻塞)

take: 从队列中取一个元素,如果队列为空,则阻塞当前线程,等待唤醒。如果正常取出了元素,那么唤醒其他阻塞的线程(有些执行put操作的线程因为队列满而阻塞)

Object类提供了几个操作来进行当前线程的唤醒和阻塞wait: 阻塞当前线程,其实就是将当前线程放入当前对象的等待集中,释放锁(如果持有锁的话),暂停当前线程。 notify: 唤醒当前对象等待集上的一个线程。 notifyAll: 唤醒当前对象等待集上的所有线程。

基于以上,我们实现一个自己的阻塞队列:

代码语言:javascript
复制
public class MyBlockingQueue1<T> implements MyBlockingQueue<T>{
    private Object[] array;
    private int count=0;
    private int getIndex=0;
    private int putIndex=0;
    public MyBlockingQueue1(int cap){
        array = new Object[cap];
    }
    public synchronized void put(T ele) throws InterruptedException {
        while (isFull()){
            wait();
        }
        array[putIndex++]=ele;
        if (putIndex>=array.length){
            putIndex=0;
        }
        count++;
        notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (isEmpty()){
            wait();
        }
        Object element = array[getIndex++];
        if (getIndex>=array.length){
            getIndex=0;
        }
        count--;
        notifyAll();
        @SuppressWarnings("unchecked")
        T t = (T)element;
        return t;
    }
    private boolean isEmpty(){
        return count==0;
    }
    private boolean isFull(){
        return  count>=array.length;
    }
}

put和take方法都加了synchronized,也就是说这两个方法执行之前都需要先取得同一个对象锁,从而,这两个方法就不可以并行执行。于是我们可以稍微优化一下,比如put和take使用两个不同的锁,这两个操作就不会互相影响了。但也会因此使得count成为了临界资源,count++会发生竞态,我们可以考虑使用一个原子变量类来替代int类型。而且上面介绍提到的唤醒部分,每当成功put或者成功take,我们都唤醒所有线程,其实put操作成功时,我们只想唤醒那些因为队列为空而阻塞的线程,take操作成功时,我们只想唤醒那些因为队列已满而阻塞的线程,而且唤醒一个就够了。于是我们可以使用Condition来使得线程在两个不同的等待队列上进行等待,每次都唤醒特定队列上的一个线程。于是0.2版代码如下:

代码语言:javascript
复制
public class MyBlockingQueue2<T> implements MyBlockingQueue<T>{

    private Object[] array;
    private AtomicInteger count=new AtomicInteger(0);//临界资源,使用原子变量类
    private int getIndex=0;
    private int putIndex=0;

    private ReentrantLock putLock = new ReentrantLock();
    private final Condition notEmpty = putLock.newCondition();//防止过早唤醒
    private ReentrantLock takeLock = new ReentrantLock();
    private final Condition notFull = takeLock.newCondition();//防止过早唤醒

    public MyBlockingQueue2(int cap){
        array = new Object[cap];
    }
    public void put(T ele) throws InterruptedException {
        try {
            putLock.lock();
            while (isFull()){
                notFull.await();
            }
            array[putIndex++]=ele;
            if (putIndex>=array.length){
                putIndex=0;
            }
            int c = count.getAndIncrement();
            if (c==0){
                notEmpty.signal();
            }
        }finally {
            putLock.unlock();
        }

    }

    public  T take() throws InterruptedException {
        try {
            takeLock.lock();
            while (isEmpty()){
                notEmpty.wait();
            }
            Object element = array[getIndex++];
            if (getIndex>=array.length){
                getIndex=0;
            }
            int c = count.getAndDecrement();
            if (c==array.length){
                notFull.signal();
            }
            @SuppressWarnings("unchecked")
            T t = (T)element;
            return t;
        }finally {
            takeLock.unlock();
        }

    }
    private boolean isEmpty(){
        return count.get()==0;
    }
    private boolean isFull(){
        return  count.get()>=array.length;
    }
}

JDK中的阻塞队列实现

我们自己写的这个阻塞队列只是实现了最基本的put和take两个操作,而jdk中的阻塞队列提供的功能更加全面一些。首先,提供了put和take对应的非阻塞方法offer和poll,这两个方法,即使遇到队列为满或为空的情况,也不会阻塞当前线程,而是直接返回false或null。并且还提供了阻塞时间选项,比如,poll时,如果队列为空,可以选择阻塞x秒,如果x秒内还是没能拿到元素,则返回null。其次还提供了比如drainTo、contains、remove等方法来完成一次性取出所有元素,判断元素存在与否,移除一个元素等操作,作为阻塞队列的接口BlockingQueue主要有四个实现类: ArrayBlockingQueue:这个是用数组实现的一个阻塞队列,put和take使用了同一个锁,线程等待队列使用了Condition。 LinkedBlockingQueue:这个是用链表实现的一个阻塞队列,put和take使用了 两个锁,理论上支持更大的并发量。 还有就是PriorityBlockingQueue和SynchronousQueue,一个是优先级阻塞队列,每次都按照优先级来存取元素,另一个是同步队列,其实它内部没有维护队列,而是存入一个元素之后,必须有其他线程将他取走,不然再想put的线程就会被阻塞。这两个队列内部实现跟前两个有所不同,看起来要更复杂一点,比如PriorityBlockingQueue内部是通过堆来维护优先级的,优先级比对我我们可以存入自己的比较器,而SynchronousQueue内部通过Transferer分装了一些操作,这两个队列待独立一篇细说。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Vegout 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自己写一个阻塞队列
  • JDK中的阻塞队列实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档