首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >回归Java基础:LinkedBlockingQueue阻塞队列解析

回归Java基础:LinkedBlockingQueue阻塞队列解析

作者头像
捡田螺的小男孩
发布2020-04-15 18:02:15
发布2020-04-15 18:02:15
5370
举报

前言

整理了阻塞队列LinkedBlockingQueue的学习笔记,希望对大家有帮助。有哪里不正确,欢迎指出,感谢。

LinkedBlockingQueue的概述

LinkedBlockingQueue的继承体系图

我们先来看看LinkedBlockingQueue的继承体系。使用IntelliJ IDEA查看类的继承关系图形

  • 蓝色实线箭头是指类继承关系
  • 绿色箭头实线箭头是指接口继承关系
  • 绿色虚线箭头是指接口实现关系。

LinkedBlockingQueue实现了序列化接口 Serializable,因此它有序列化的特性。LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,因此它拥有了队列Queue相关方法的操作。

LinkedBlockingQueue的类图

类图来自Java并发编程之美

LinkedBlockingQueue主要特性:

  1. LinkedBlockingQueue底层数据结构为单向链表。
  2. LinkedBlockingQueue 有两个Node节点,一个head节点,一个tail节点,只能从head取元素,从tail添加元素。
  3. LinkedBlockingQueue 容量是一个原子变量count,它的初始值为0。
  4. LinkedBlockingQueue有两把ReentrantLock的锁,一把控制元素入队,一把控制出队,保证在并发情况下的线程安全。
  5. LinkedBlockingQueue 有两个条件变量,notEmpty 和 notFull。它们内部均有一个条件队列,存放着出入队列被阻塞的线程,这其实是生产者-消费者模型。

LinkedBlockingQueue的重要成员变量

代码语言:javascript
复制
//容量范围,默认值为 Integer.MAX_VALUE
 
private final int capacity;
 


 
//当前队列元素个数
 
private final AtomicInteger count = new AtomicInteger();
 


 
//头结点
 
transient Node<E> head;
 


 
//尾节点
 
private transient Node<E> last;
 


 
//take, poll等方法的可重入锁
 
private final ReentrantLock takeLock = new ReentrantLock();
 


 
//当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待
 
private final Condition notEmpty = takeLock.newCondition();
 


 
//put, offer等方法的可重入锁
 
private final ReentrantLock putLock = new ReentrantLock();
 


 
//当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待
 
private final Condition notFull = putLock.newCondition();
 

LinkedBlockingQueue的构造函数

LinkedBlockingQueue有三个构造函数:

  1. 无参构造函数,容量为Integer.MAX

代码语言:javascript
复制
public LinkedBlockingQueue() {
 
 this(Integer.MAX_VALUE);
 
}
 

2. 设置指定容量的构造器

代码语言:javascript
复制
public LinkedBlockingQueue(int capacity) {
 
 if (capacity <= 0) throw new IllegalArgumentException();
 
 //设置队列大小
 
 this.capacity = capacity;
 
 //new一个null节点,head、tail节点指向该节点
 
 last = head = new Node<E>(null);
 
}
 

3. 传入集合,如果调用该构造器,容量默认也是Integer.MAX_VALUE

代码语言:javascript
复制
 public LinkedBlockingQueue(Collection<? extends E> c) {
 
 //调用指定容量的构造器
 
 this(Integer.MAX_VALUE);
 
 //获取put, offer的可重入锁
 
 final ReentrantLock putLock = this.putLock;
 
        putLock.lock(); 
 
 try {
 
 int n = 0;
 
 //循环向队列中添加集合中的元素
 
 for (E e : c) {
 
 if (e == null)
 
 throw new NullPointerException();
 
 if (n == capacity)
 
 throw new IllegalStateException("Queue full");
 
 //将队列的last节点指向该节点
 
                enqueue(new Node<E>(e));
 
 ++n;
 
 }
 
 //更新容量值
 
            count.set(n);
 
 } finally {
 
 //释放锁
 
            putLock.unlock();
 
 }
 
 }
 

LinkedBlockingQueue底层Node类

Node源码

代码语言:javascript
复制
static class Node<E> {
 
 // 当前节点的元素值
 
    E item;
 
 // 下一个节点的索引
 
 Node<E> next;
 
 //节点构造器
 
 Node(E x) { 
 
     item = x;
 
 }
 
 }
 

LinkedBlockingQueue的节点符合单向链表的数据结构要求:

  • 一个成员变量为当前节点的元素值
  • 一个成员变量是下一节点的索引
  • 构造方法的唯一参数节点元素值。

Node节点图

item表示当前节点的元素值,next表示指向下一节点的指针

LinkedBlockingQueue常用操作

offer操作

入队方法,其实就是向队列的尾部插入一个元素。如果元素为空,抛出空指针异常。如果队列已满,则丢弃当前元素,返回false,它是非阻塞的

offer源代码

offer方法源码如下:

代码语言:javascript
复制
 public boolean offer(E e) {
 
 //为空直接抛空指针
 
 if (e == null) throw new NullPointerException();
 
 final AtomicInteger count = this.count;
 
 //如果当前队列满了的话,直接返回false
 
 if (count.get() == capacity)
 
 return false;
 
 int c = -1;
 
 //构造新节点
 
 Node<E> node = new Node<E>(e);
 
 获取putLock独占锁
 
 final ReentrantLock putLock = this.putLock;
 
        putLock.lock();
 
 try {
 
 //判断队列是否已满
 
 if (count.get() < capacity) {
 
 //进队列
 
                enqueue(node);
 
 //递增元素计数
 
                c = count.getAndIncrement();
 
 //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
 
 if (c + 1 < capacity)
 
                    notFull.signal();
 
 }
 
 } finally {
 
 //释放锁
 
            putLock.unlock();
 
 }
 
 //如果容量为0,则
 
 if (c == 0)
 
 //激活 notEmpty 的条件队列,唤醒被阻塞的线程
 
            signalNotEmpty();
 
 return c >= 0;
 
 }
 

enqueue方法源码如下:

代码语言:javascript
复制
private void enqueue(Node<E> node) {
 
 //从尾节点加进去
 
 last = last.next = node;
 
 }
 

为了形象生动,我们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue

signalNotEmpty方法源码如下

代码语言:javascript
复制
private void signalNotEmpty() {
 
 //获取take独占锁
 
 final ReentrantLock takeLock = this.takeLock;
 
    takeLock.lock();
 
 try {
 
 //唤醒notEmpty条件队列里被阻塞的线程
 
       notEmpty.signal();
 
 } finally {
 
 //释放锁
 
       takeLock.unlock();
 
 }
 
 }
 
offer执行流程图

基本流程:

  • 判断元素是否为空,如果是,就抛出空指针异常。
  • 判读队列是否已满,如果是,添加失败,返回false。
  • 如果队列没满,构造Node节点,上锁。
  • 判断队列是否已满,如果队列没满,Node节点在队尾加入队列待。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

put操作

put方法也是向队列尾部插入一个元素。如果元素为null,抛出空指针异常。如果队列己满则阻塞当前线程,直到队列有空闲插入成功为止。如果队列空闲则插入成功,直接返回。如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

put源代码

代码语言:javascript
复制
 public void put(E e) throws InterruptedException {
 
 ////为空直接抛空指针异常
 
 if (e == null) throw new NullPointerException();
 
 int c = -1;
 
 // 构造新节点
 
 Node<E> node = new Node<E>(e);
 
 //获取putLock独占锁
 
 final ReentrantLock putLock = this.putLock;
 
 final AtomicInteger count = this.count;
 
 //获取独占锁,它跟lock的区别,是可以被中断
 
        putLock.lockInterruptibly();
 
 try {
 
 //队列已满线程挂起等待
 
 while (count.get() == capacity) {
 
                notFull.await();
 
 }
 
 //进队列
 
            enqueue(node);
 
 //递增元素计数
 
            c = count.getAndIncrement();
 
 //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
 
 if (c + 1 < capacity)
 
                notFull.signal();
 
 } finally {
 
 //释放锁
 
            putLock.unlock();
 
 }
 
 //如果容量为0,则
 
 if (c == 0)
 
 //激活 notEmpty 的条件队列,唤醒被阻塞的线程
 
            signalNotEmpty();
 
 }
 
put流程图

基本流程:

  • 判断元素是否为空,如果是就抛出空指针异常。
  • 构造Node节点,上锁(可中断锁)
  • 判断队列是否已满,如果是,阻塞当前线程,一直等待。
  • 如果队列没满,Node节点在队尾加入队列。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

poll操作

从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的。

poll源代码

poll方法源代码

代码语言:javascript
复制
 public E poll() {
 
 final AtomicInteger count = this.count;
 
 //如果队列为空,返回null
 
 if (count.get() == 0)
 
 return null;
 
        E x = null;
 
 int c = -1;
 
 //获取takeLock独占锁
 
 final ReentrantLock takeLock = this.takeLock;
 
        takeLock.lock();
 
 try {
 
 //如果队列不为空,则出队,并递减计数
 
 if (count.get() > 0) {
 
                x = dequeue();
 
                c = count.getAndDecrement();
 
 ////容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
 
 if (c > 1)
 
                    notEmpty.signal();
 
 }
 
 } finally {
 
 //释放锁
 
            takeLock.unlock();
 
 }
 
 if (c == capacity)
 
 //唤醒notFull条件队列里被阻塞的线程
 
            signalNotFull();
 
 return x;
 
 }
 

dequeue方法源代码

代码语言:javascript
复制
 //出队列
 
 private E dequeue() {
 
 //获取head节点
 
 Node<E> h = head;
 
 //获取到head节点指向的下一个节点
 
 Node<E> first = h.next;
 
 //head节点原来指向的节点的next指向自己,等待下次gc回收
 
        h.next = h; // help GC
 
 // head节点指向新的节点
 
        head = first;
 
 // 获取到新的head节点的item值
 
        E x = first.item;
 
 // 新head节点的item值设置为null
 
        first.item = null;
 
 return x;
 
 }
 

为了形象生动,我们用一张图来描述出队过程。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue

signalNotFull方法源码

代码语言:javascript
复制
 private void signalNotFull() {
 
 //获取put独占锁
 
 final ReentrantLock putLock = this.putLock;
 
        putLock.lock();
 
 try {
 
 ////唤醒notFull条件队列里被阻塞的线程
 
            notFull.signal();
 
 } finally {
 
 //释放锁
 
            putLock.unlock();
 
 }
 
 }
 
poll流程图

基本流程:

  • 判断元素是否为空,如果是,就返回null。
  • 加锁
  • 判断队列是否有元素,如果没有,释放锁
  • 如果队列有元素,则出队列,获取数据,容量计数器减一。
  • 判断此时容量是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放完锁后,判断容量是否满,如果是,唤醒notFull的阻塞线程。

peek操作

获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。该方法是不阻塞的。

peek源代码

代码语言:javascript
复制
 public E peek() {
 
 //队列容量为0,返回null
 
 if (count.get() == 0)
 
 return null;
 
 //获取takeLock独占锁
 
 final ReentrantLock takeLock = this.takeLock;
 
        takeLock.lock();
 
 try {
 
 Node<E> first = head.next;
 
 //判断first是否为null,如果是直接返回
 
 if (first == null)
 
 return null;
 
 else
 
 return first.item;
 
 } finally {
 
 //释放锁
 
            takeLock.unlock();
 
 }
 
 }
 
peek流程图

基本流程:

  • 判断队列容量大小是否为0,如果是,就返回null。
  • 加锁
  • 获取队列头部节点first
  • 判断节点first是否为null,是的话,返回null。
  • 如果fist不为null,返回节点first的元素。
  • 释放锁。

take操作

获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列 不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

take源代码

代码语言:javascript
复制
 public E take() throws InterruptedException {
 
        E x;
 
 int c = -1;
 
 final AtomicInteger count = this.count;
 
 //获取takeLock独占锁
 
 final ReentrantLock takeLock = this.takeLock;
 
 //获取独占锁,它跟lock的区别,是可以被中断
 
        takeLock.lockInterruptibly();
 
 try {
 
 //当前队列为空,则阻塞挂起
 
 while (count.get() == 0) {
 
                notEmpty.await();
 
 }
 
 //)出队并递减计数
 
            x = dequeue();
 
            c = count.getAndDecrement();
 
 if (c > 1)
 
 //激活 notEmpty 的条件队列,唤醒被阻塞的线程
 
                notEmpty.signal();
 
 } finally {
 
 //释放锁
 
            takeLock.unlock();
 
 }
 
 if (c == capacity)
 
 //激活 notFull 的条件队列,唤醒被阻塞的线程
 
            signalNotFull();
 
 return x;
 
 }
 
take流程图

基本流程:

  • 加锁
  • 判断队列容量大小是否为0,如果是,阻塞当前线程,直到队列不为空。
  • 如果队列容量大小大于0,节点出队列,获取元素x,计数器减一。
  • 判断队列容量大小是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放锁。
  • 判断队列容量是否已满,如果是,唤醒notFull的阻塞线程。
  • 返回出队元素x

remove操作

删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。

remove方法源代码

代码语言:javascript
复制
 public boolean remove(Object o) {
 
 //为空直接返回false
 
 if (o == null) return false;
 
 //双重加锁
 
        fullyLock();
 
 try {
 
 //边历队列,找到元素则删除并返回true
 
 for (Node<E> trail = head, p = trail.next;
 
                 p != null;
 
                 trail = p, p = p.next) {
 
 if (o.equals(p.item)) {
 
 //执行unlink操作
 
                    unlink(p, trail);
 
 return true;
 
 }
 
 }
 
 return false;
 
 } finally {
 
 //解锁
 
            fullyUnlock();
 
 }
 
 }
 

双重加锁,fullyLock方法源代码

代码语言:javascript
复制
void fullyLock() {
 
 //putLock独占锁加锁
 
        putLock.lock();
 
 //takeLock独占锁加锁
 
        takeLock.lock();
 
 }
 

unlink方法源代码

代码语言:javascript
复制
 void unlink(Node<E> p, Node<E> trail) {
 
        p.item = null;
 
        trail.next = p.next;
 
 if (last == p)
 
 last = trail;
 
 //如果当前队列满 ,则删除后,也不忘记唤醒等待的线程
 
 if (count.getAndDecrement() == capacity)
 
            notFull.signal();
 
 }
 

fullyUnlock方法源代码

代码语言:javascript
复制
 void fullyUnlock() {
 
 //与双重加锁顺序相反,先解takeLock独占锁
 
        takeLock.unlock();
 
        putLock.unlock();
 
 }
 
remove流程图

基本流程

  • 判断要删除的元素是否为空,是就返回false。
  • 如果要删除的元素不为空,加双重锁
  • 遍历队列,找到要删除的元素,如果找不到,返回false。
  • 如果找到,删除该节点,返回true。
  • 释放锁

size操作

获取当前队列元素个数。

代码语言:javascript
复制
 public int size() {
 
 return count.get();
 
 }
 

由于进行出队、入队操作时的 count是加了锁的,所以结果相比ConcurrentLinkedQueue 的 size 方法比较准确。

总结

  • LinkedBlockingQueue底层通过单向链表实现。
  • 它有头尾两个节点,入队操作是从尾节点添加元素,出队操作是对头节点进行操作。
  • 它的容量是原子变量count,保证szie获取的准确性。
  • 它有两把独占锁,保证了队列操作原子性。
  • 它的两把锁都配备了一个条件队列,用来存放阻塞线程,结合入队、出队操作实现了一个生产消费模型。

Java并发编程之美中,有一张图惟妙惟肖描述了它,如下图:

参看与感谢

  • 《Java并发编程之美》
  • 阻塞队列之LinkedBlockingQueue
  • Java并发之LinkedBlockingQueue
  • 【细谈Java并发】谈谈LinkedBlockingQueue

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 捡田螺的小男孩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • LinkedBlockingQueue的概述
    • LinkedBlockingQueue的继承体系图
    • LinkedBlockingQueue的类图
  • LinkedBlockingQueue的重要成员变量
  • LinkedBlockingQueue的构造函数
  • LinkedBlockingQueue底层Node类
    • Node源码
    • Node节点图
  • LinkedBlockingQueue常用操作
    • offer操作
      • offer源代码
      • offer执行流程图
    • put操作
      • put源代码
      • put流程图
    • poll操作
      • poll源代码
      • poll流程图
    • peek操作
      • peek源代码
      • peek流程图
    • take操作
      • take源代码
      • take流程图
    • remove操作
      • remove方法源代码
      • remove流程图
    • size操作
  • 总结
  • 参看与感谢
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档