前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LinkedBlockingQueue 核心源码解析

LinkedBlockingQueue 核心源码解析

原创
作者头像
JavaEdge
修改2020-04-20 12:13:16
4430
修改2020-04-20 12:13:16
举报
文章被收录于专栏:JavaEdge

孤独,所有人都是孤独的,没有人能独自超脱这一切。 ——玛娅·安杰格

0 前言

LinkedBlockingQueue - 单链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素从队列尾部插入,从队首获取元素.是深入并发编程的基础数据结构.

1 继承体系

  • Queue 作为最基础的接口,定义了队列的三大类基本操作:
  • BlockingQueue 即在 Queue 的基础上加上了阻塞的概念,比如
    • 一直阻塞
    • 阻塞一定时间,返回特殊值

remove 方法,BlockingQueue 类注释中定义的是抛异常,但 LinkedBlockingQueue 中 remove 方法实际是返回 false。

2 属性

2.1 链式存储

  • 节点的数据结构
  • next 为当前元素的下一个,为 null 表示当前节点是最后一个
  • 链表容量
    默认大小为 Integer.MAX_VALUE !显然太大了!禁用!
  • 链表已有元素大小 使用了 AtomicInteger,线程安全
  • 链表头、尾

2.2 锁

  • take 时的锁
  • take 的条件队列,condition 可以简单理解为基于 ASQ 建立的条件队列
  • put 时的锁
  • put 的条件队列

锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全,设计两种锁,是为了 take 和 put 两种操作可以同时进行,而互不影响。

2.3 迭代器

  • 实现了自己的迭代器

3 构造方法

3.1 无参

  • 默认为 Integer 的最大值

3.2 有参

  • 指定容量大小.链表头尾相等,节点值(item)都是 null
  • 已有集合数据public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { // 集合元素不能为 null if (e == null) throw new NullPointerException(); // capacity 代表链表的大小,在这为 Integer.MAX_VALUE // 若集合大小 > IntegerMAX_VALUE,直接抛异常. if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }

4 新增

以 offer 方法为例.

将元素E添加到队列的末尾.

代码语言:txt
复制
public boolean offer(E e) { 
  if (e == null) throw new NullPointerException(); 
  // 如果“队列已满”,则返回false,表示插入失败。 
  final AtomicInteger count = this.count;
  if (count.get() == capacity) return false; 
  int c = -1; 
  // 新建“节点e” 
  Node<E> node = new Node(e); 
  final ReentrantLock putLock = this.putLock; 
  // 获取“插入锁putLock” 
  putLock.lock(); 
  try { 
    // 再次对“队列是不是满”的进行判断。 
    // 若“队列未满”,则插入节点。 
    if (count.get() < capacity) { 
      // 插入节点 enqueue(node);
      // 将“当前节点数量”+1,并返回“原始的数量” 
      c = count.getAndIncrement(); 
      // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。 
      if (c + 1 < capacity)  notFull.signal(); 
    } 
  } finally { 
      // 释放“插入锁putLock” 
      putLock.unlock();
  } 

  // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程 
  if (c == 0) 
    signalNotEmpty(); 
    return c >= 0;
  }

enqueue

  • 将node添加到队列末尾,并设置node为新的尾节点

signalNotEmpty

  • 发出等待信号。 仅能从put/offer调用(否则通常不锁定takeLock),唤醒notEmpty上的等待线程.

新增数据成功后,在适当时机,会唤起 put 的等待线程(队列不满时),或者 take 的等待线程(队列不为空时),这样保证队列一旦满足 put 或者 take 条件时,立马就能唤起阻塞线程,继续运行,保证了唤起的时机不被浪费。

5 取出

以take()为例.取出并返回队列的头。若队列为空,则一直等待。

代码语言:txt
复制
public E take() throws InterruptedException { 
  E x; 
  int c = -1; 
  final AtomicInteger count = this.count; 
  final ReentrantLock takeLock = this.takeLock; 
  // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常 
  takeLock.lockInterruptibly(); 
  try { 
    // 若“队列为空”,则一直等待。 
    while (count.get() == 0) { 
      notEmpty.await(); 
    } 
    // 取出元素 
    x = dequeue(); 
    // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 
    c = count.getAndDecrement();
    if (c > 1) notEmpty.signal();
  } finally { 
    // 释放“取出锁” 
    takeLock.unlock(); 
  } 
  // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。 
  if (c == capacity) 
    signalNotFull(); 
  return x;
}

dequeue

  • 删除队列的头节点,并将表头指向“原头节点的下一个节点”。

signalNotFull

  • 发出等待的信号。 仅能从take/poll调用.唤醒notFull上的等待线程.

6 总结

理解好阻塞队列很重要,经常会被用在线程池的场景中.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 前言
  • 1 继承体系
  • 2 属性
    • 2.1 链式存储
      • 2.2 锁
        • 2.3 迭代器
        • 3 构造方法
          • 3.1 无参
            • 3.2 有参
            • 4 新增
              • enqueue
                • signalNotEmpty
                • 5 取出
                  • dequeue
                    • signalNotFull
                    • 6 总结
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档