摘要:
我们先来看一下它的实现类图,它实现了 Delayed、BlockingQueue 接口和 AbstractQueue 基础类,从实现的功能上看,它首先是一个阻塞队列,然后 Delayed 接口是标记给定延迟后执行的对象,结合类名也可以大致的分析出:DelayQueue 是一个 延时阻塞 队列
// 保证线程安全的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 标记取元素时是否有线程在排队
private Thread leader = null;
// 是否可取的条件变量
private final Condition available = lock.newCondition();
入队逻辑很简单:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
出队非阻塞API,核心逻辑就是:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
出队阻塞API,核心逻辑就是:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶元素
E first = q.peek();
// 为空,等待被唤醒
if (first == null)
available.await();
// 堆顶不为空
else {
// 获取元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 已过期就取走
if (delay <= 0)
return q.poll();
// 没过期往下走
// 这里设置为 null 是放在还有别的线程持有没有释放导致内存泄漏
first = null; // don't retain ref while waiting
// 校验是否有等待线程,有就等待leader线程取完或有新加入的元素唤醒它
if (leader != null)
available.await();
// 没有等待线程,就把自己设置为等待线程,然后等待超时时间唤醒重试
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 唤醒后就把 leader 置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有等待线程并且队列还有数据,就唤醒下一个线程来取
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
回到问题 TOP 2 ,通过对入队列和出队列的分析,其实现原理想必已经明白,就是在队列的基础上增加了时间维度的优先级,然后通过锁和条件变量来控制取/放流程。
和我们开头分析的一样,DelayQueue是一个 阻塞延时且无界 的队列,它使用的是优先级队列+时间维度来实现。回到问题 TOP 1 延时队列场景主要适用于定时任务,但是对于内存中的延时队列往往不能用于重要的业务场景(毕竟还是内存队列,宕机了就没咯),所以可以应用于一些基础类库,不太重要的业务定时清理和处理等。