摘要:
ConcurrentLinkedQueue 是一个线程安全且 非阻塞 的 无界 队列,它采用先进先出的规则,实现了 AbstractQueue 基础抽象类和 Queue 接口。
它在内部维护了一个 Node 类,有存放数据的 item 和 执行下一个节点的指针 next,全部通过 CAS 来操作。
在前面分析阻塞队列 ArrayBlockingQueue 时候,我们发现对数据的增删都是从数组的第一个元素进行操作, 这里我们可以把 head 节点理解为相同的索引作用。
// 头节点
private transient volatile Node<E> head;
// 尾节点
private transient volatile Node<E> tail;
// ConcurrentLinkedQueue.Node
// 存放数据
volatile E item;
// 下一个节点指针
volatile Node<E> next;
入队列的API,核心逻辑就是:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 循环入队,直到成功
// t 是tail节点
// p 是链表此时此刻的尾结点(也可以理解为入队节点)
for (Node<E> t = tail, p = t;;) {
// q 是尾结点的 next 节点
Node<E> q = p.next;
// 如果q为null,说明到链表尾部了
if (q == null) {
// CAS 更新 next为新节点,失败就重试
if (p.casNext(null, newNode)) {
// 然后更新tail节点
// 这里采用的是一种巧妙的累计更新,也就是说下个 CAS p才会不等于tail
// 相当于循环两次更新一次 tail,所以才有了最下面的两个判断来设置尾结点p
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
// p 和 p的next都为空(非对象为空,还是有next的)
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
// 重置 p节点为下一个入队节点(这一步可以理解为校准,入队节点最终永远要指向最新的尾结点)
p = (p != t && t != (t = tail)) ? t : q;
}
}
可以看到 poll 的大概代码设计是和 offer 差不多的,这里是把 p 做为 头节点来维护(出队节点),同样是每两次 CAS 更新数据,更新一次头节点
public E poll() {
restartFromHead:
for (;;) {
// 从头开始
for (Node<E> h = head, p = h, q;;) {
// 取数据
E item = p.item;
// CAS 置null
if (item != null && p.casItem(item, null)) {
// 更新 head 节点
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 队列为空返回 null(可以看到是非阻塞的)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
// 重置p节点尾下一个出队节点
else
p = q;
}
}
}
回到问题 TOP 2 ,通过对入队出队的分析,可以分析 ConcurrentLinkedQueue 和 LinkedBlockingQueue 的区别主要还是 一个是非阻塞,一个是阻塞,两者都是线程安全的。
回到问题 TOP 3 ,那场景也就是线程安全且不需要阻塞功能的常规内存队列场景。
判断队列是否为空是通过判断头节点是否为空来实现的
public boolean isEmpty() {
// 查看头节点是否为空
return first() == null;
}
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取头节点的值,顺带更新一波 Head
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
遍历队列,计数,效率较差
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
通过对以上的核心方法的分析,回到问题 TOP 1 可以明白基本都是采用 CAS 自旋来实现的,保证了线程的安全性
对比阻塞队列,ConcurrentLinkedQueue 没有条件变量、锁等那些复杂的东西,代码设计层面尽量是简洁、巧妙。