Java提供很多线程安全的容器,为开发人员在并发编程场景下使用,通常我们会更加关注业务实现,而不关心底层结构。但我们应该理解这些容器的原理和使用场景,以方便我们的开发和遇到问题的分析,并且有时候也能借鉴一下大神们的实现思想。
使用线程安全队列的场景有很多,Java在实现同步机制时,多线程对竞争资源进行操作时,同一时刻只能有一个线程可以操作,其他线程进行阻塞等待,这时,需要使用一种容器队列来装载等待的线程,在入队和出队时候保证线程的安全性,也就保证同步机制下的线程可以安全的执行。Java提供两种方式来实现阻塞式和非阻塞式,阻塞式使用锁实现,非阻塞式使用CAS方式实现。使用阻塞队列和非阻塞队列的场景还有很多,比较常用的就是我们常说的生产者\消费者模型。
ConcurrentLinkedQueue——无界非阻塞队列
查看类图和实现的方法,可以看出ConcurrentLinkedQueue实现了队列操作add、offer、poll、peek和几个集合的属性操作isEmpty、size、contains、remove、iterator等。还包含volatile修饰的头结点head和尾节点tail,tail节点并不是一直指向最后一个节点。
查看节点的数据结构为Node,包含数据项item和next节点,可以看出ConcurrentLinkedQueue底层是一个单向链表结构的容器。Node提供三个使用Unsafe类实现的方法:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOff set, cmp, val);
}
}
下面我们继续查看ConcurrentLinkedQueue方法实现:构造、add、offer、poll、peek:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
无参构造:head节点、tail节点是同一个元素为null的Node元素;
将集合转成ConcurrentLinkedQueue的构造:遍历集合,将元素都放到队列中,并且第一个元素我head节点,最后一个元素我tail节点;
向队列中添加元素,add方法内部直接调用的offer方法:
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
// 用新元素 构建Node
final Node<E> newNode = new Node<E>(e);
// 从tail节点开始迭代链表
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果,p节点的next节点q为空,说明p是最后一个节点,则用cas方式将新元素设置为p节点的next节点
if (q == null) {
if (p.casNext(null, newNode)) {
// 如果cas设置成功,说明p节点的next节点添加成功,这时如果p节点不是尾节点,则设置新节点为tail节点
// 如果p是尾节点,则不设置新添加的节点为尾节点,尾节点可能存在最后一个节点或者最后一个节点的父节点中间
if (p != t)
casTail(t, newNode);
return true;
}
}
// 如果p节点的next节点q是同一个节点,说明发生了poll操作,迭代节点指向的头结点已经失效,需要指向新的头结点
// 每次迭代时,仍然需要判断尾节点是否变更,变更则需要重新从新的尾节点进行迭代
else if (p == q)
p = (t != (t = tail)) ? t : head;
// 从尾节点找next节点为空时,都需要判断一下尾节点是否发生变化,因为可能其他线程已经添加进新的节点了,尾节点可能发生了变化
// 如果尾节点没变化,则指向迭代节点的next节点
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
public E poll() {
restartFromHead:
for (;;) {
// 从头结点开始迭代
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果p结点元素不为null,则通过CAS方式设置将p的元素设为null
if (item != null && p.casItem(item, null)) {
// 如果cas设置成功,说明头结点已经被设置为null了,如果p节点还是迭代开始的头结点,则不更新头结点
// 如果p节点不是迭代开始的头结点,这时需要重新设置头结点的位置
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 说明p节点为null并且p节点的next节点也为null,队列为空,返回null,并且更新头结点
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 说明其他线程已经将头结点的元素取走,并且更新了头结点位置,需要重新从新的头结点迭代
else if (p == q)
continue restartFromHead;
// 当指向q=p.next之前,其他线程已经添加了一个元素到队列,这时q不为null,则将p指向q,继续下次迭代
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// CAS方式更新头结点,使用p节点代替原头结点h
if (h != p && casHead(h, p))
// CAS更新头结点成功,使用延迟方式,将原头结点的next节点指向自己,也就是让原头结点失效
h.lazySetNext(h);
}
当队列为空的时:
- 正常情况下会执行第二个分支,q=p.next & q==null,返回null,调用updateHead时,发现p==h不做操作;
- 当执行第二个分支时,其他线程添加了新元素,导致q !=null,这时会执行最后一个分支,令p == q,继续执行下次迭代;
当head节点为null时:
- 类似队列为空时,执行q=p.next时,刚好有其他线程添加了元素,会执行最后一个分支,令p == q,继续执行下次迭代;
当head节点存在元素时:
- 会执行第一个语句,通过cas方式将第一个元素设为null,可能此时有多个线程执行cas操作,但只有一个能够成功。成功的线程会更新头结点,更新节点分两种情况,第一种情况是将头结点指向下一个节点,第二种情况是队列为空,重新将头结点指向null。更新完头结点时,会将原头结点的next指向自己,相当于从队列中,彻底移除了弹出的元素。
- 这时就有可能出现第三个语句的情况,说明头结点已经更新,需重新从新的头结点迭代;
poll后,头结点不会总是指向第一个元素的,如下图:
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
peek方法只是读取第一个元素,不会做移除操作,并且会校准真正头元素的节点,将头结点指向真正的第一个元素。
ConcurrentLinkedQueue主要就是依靠CAS方式实现的线程安全的队列,而且没有队列长度的限制,向队列添加元素会放到队尾,从队列获取元素会从队首,遵循FIFO原则。
Java提供了一个阻塞队列的接口——BlockingQueue,在队列的基础上增加可阻塞添加元素和可阻塞获取元素的方法。
BlockingQueue的实现包括ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue:
ArrayBlockingQueue——数组实现的有界阻塞队列
// 负责装实际元素的数组
final Object[] items;
// 出队下标
int takeIndex;
// 入队下标
int putIndex;
// 队列长度
int count;
// 队列操作的锁对象
final ReentrantLock lock;
// 空队列的出队等待条件,等待队列不空的唤醒,进行出队
private final Condition notEmpty;
// 满队列的入队等待条件,等待队列不满的唤醒,进行入队
private final Condition notFull;
入队操作
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 数组环状处理
if (++putIndex == items.length)
putIndex = 0;
count++;
// 队列不空条件的唤醒,允许阻塞出队线程继续操作
notEmpty.signal();
}
出队操作
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 数组环状处理
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 队列不满条件的唤醒,允许阻塞入队线程继续操作
notFull.signal();
return x;
}
ArrayBlockingQueue使用环状数组作为容器,使用入队下标进行入队操作,使用出队下标进行出队。add、offer、put都是通过enqueue入队操作实现;take、poll都是通过dequeue出队操作实现;每次入队和出队都需要获取同一把锁,没有获得锁的将被阻塞;另外,当队列为空时候,进行出队操作的线程也将被阻塞,当有入队时则会唤醒阻塞的出队操作线程;当队列满时,进行入队操作的线程将被阻塞,当有出队时则会唤醒阻塞的入队操作线程。
LinkedBlockingQueue——使用链表实现的有界阻塞队列
// 队列容量
private final int capacity;
// 队列长度
private final AtomicInteger count = new AtomicInteger();
// 对首节点
transient Node<E> head;
// 队尾节点
private transient Node<E> last;
// 出队锁
private final ReentrantLock takeLock = new ReentrantLock();
// 空队列的出队等待条件,等待队列不空的唤醒,进行出队
private final Condition notEmpty = takeLock.newCondition();
// 入队锁
private final ReentrantLock putLock = new ReentrantLock();
// 满队列的入队等待条件,等待队列不满的唤醒,进行入队
private final Condition notFull = putLock.newCondition();
// 链表的节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
入队操作
private void enqueue(Node<E> node) {
// 新节点放在尾节点的next,并将尾节点指向新入队节点
last = last.next = node;
}
出队操作
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
LinkedBlockingQueue是使用链表为容器实现的有界阻塞队列,与ArrayBlockingQueue不同的是,LinkedBlockingQueue使用两把锁:put锁和take锁,可以并行的进行入队和出队操作,入队不仅需要唤醒空队列时出队阻塞的线程,还需要唤醒满队列入队的线程;出队不仅需要唤醒满队列时入队阻塞的线程,还需要唤醒空队列出队的线程。
PriorityBlockingQueue——使用优先级队列实现的无界阻塞队列
// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 数组实现的队列
private transient Object[] queue;
// 队列长度
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 锁
private final ReentrantLock lock;
// 空队列的出队等待条件,等待队列不空的唤醒,进行出队
private final Condition notEmpty;
// 用于控制容器增长的volatile类型的变量锁
private transient volatile int allocationSpinLock;
入队操作
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 判断是否需要扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 根据是否有比较器对象,来将新元素插到队列中
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
扩容操作
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 通过判断volatile状态锁,使用cas方式保证只有一个线程可以获得扩容权限
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 小于64,每次增长size+2,否则增长size>>1,也就是长度的一半
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 插入时候不会判断是否有界,但扩容时候超过最大长度会报内存溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 将扩容锁,置回初始状态
allocationSpinLock = 0;
}
}
// 释放线程执行权,因为这个时候一定有其他线程正在执行扩容操作
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
// JNI方式复制数组
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
出队操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 没有元素可以出队列则阻塞,等待插入时候的非空条件唤醒
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 出队时,每次讲数组也就是树的根弹出
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
// 根据最小二叉堆算法,进行调整根元素
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
PriorityBlockingQueue使用同一把锁,来进行入队和出队,通过使用二叉堆算法实现队列的顺序性,每次入队时候通过二叉堆找到元素的位置,出队时都是弹出根元素,并且将整个二叉堆进行再平衡操作。另外PriorityBlockingQueue是无界阻塞队列,但是扩容操作超过最长长度会报内存溢出,并且通过volatile修饰的变量和变量CAS操作实现的容器扩容锁。
DelayQueue是基于PriorityBlockingQueue实现的无界阻塞队列,队列元素需要实现了Delayed接口,所以,DelayQueue队列中的元素是按照延迟时间进行排序的,每次需要弹出将要到期的元素,类似于RabbitMq的延迟队列。另外,DelayQueue使用Leader/Follower模型,leader是出队线程,当出队线程操作完成时,将唤醒其他follower线程竞争leader,然后新的leader进行出队。
SynchronousQueue是使用阻塞队列实现的同步队列,分为公平模式和非公平模式,SynchronousQueue本身不存储任何元素,需要保证消费者和生产者的线程以类似于同步的方式进行信息的传递。在Executors.newCachedThreadPool的线程池的阻塞队列就是使用的SynchronousQueue。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Java提供了一个双端阻塞队列的接口——BlockingDeque,BlockingDeque接口继承自BlockingQueue接口,BlockingDeque定义了对于队列的操作是包含对队尾和队首尾部分别进行入队和出队的操作。
入队首:putFirst(E e)、addFirst(E e)、offerFirst(E e)
入队尾:putLast(E e)、addLast(E e)、offerLast(E e)
队首出:takeFirst()、pollFirst()
队尾出:taskLast()、pollLast()
BlockingDeque是接口定义,LinkedBlockingDeque是Java提供的唯一实现类,下面我们通过源码分析双端阻塞队列的实现算法。
// 链表元素结构
static final class Node<E> {
// 元素对象
E item;
// 前置节点
Node<E> prev;
// 后续节点
Node<E> next; Node(E x) {
item = x;
}
}
// 队列头结点
transient Node<E> first;
// 队列尾节点
transient Node<E> last;
// 队列元素数量
private transient int count;
// 容量
private final int capacity;
// 锁
final ReentrantLock lock = new ReentrantLock();
// 当队列空的时候,出队线程会进行阻塞,根据此条件在有线程进行入队时候,唤醒这些因为队列空而阻塞的出队线程
private final Condition notEmpty = lock.newCondition();
// 当队列满的时候,入队线程会进行阻塞,根据此条件在有线程进行出队时候,唤醒这些因为队列满而阻塞的入队线程
private final Condition notFull = lock.newCondition();
下面分析一下入队和出队操作
private boolean linkFirst(Node<E> node) {
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
private boolean linkLast(Node<E> node) {
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
private E unlinkFirst() {
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
private E unlinkLast() {
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
LinkedBlockingDeque是有界阻塞的双端队列,通过属性可以看出LinkedBlockingDeque使用同一个锁进行入队和出队操作;count与capacity表示,队列中元素数量和容量;链表元素包括前置和后续两个属性,便于从队列的两端进行遍历。从linkFirst()、linkLast()、linkFirst()、linkLast()可以看出,对于头尾节点的操作与LinkedBlockingQueue很像,区别在于LinkedBlockingQueue使用两把锁,LinkedBlockingDeque使用一把锁。
阻塞队列是一种线程安全的队列,支持在并发环境下对队列的入队和出队操作,适用于生产者与消费者模式,入队线程就是生产者,出队线程就是消费者。当队列为空(或队列为满)时,消费者(或生产者)线程会进入阻塞状态,使用线程的阻塞/通知模型,来通知阻塞的线程。当多个消费者(或生产者)同时进行队列操作时,没有得到锁的线程将被阻塞,等待其他线程释放锁,再去竞争锁。
双端阻塞队列适用于工作密取模式,工作密取也是一种生产者\消费者模型,每个消费者都有自己的双端队列,当自己的队列完成之后,会从其他消费者的双端队列的尾部(正常消费是从队列的头部)进行消费,这样减少竞争关系。