
DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。
队列头元素是最快要过期的元素。

由该图可知

/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
private final transient ReentrantLock lock = new ReentrantLock();
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follwer线程,被唤醒的follwer线程被选举为新的leader线程。
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/19 23:05
* @mark: show me the code , change the world
*/
public class DelayQueueTest {
static class DelayedEle implements Delayed {
private final long delayTime; //延迟时间
private final long expire; //到期时间
private String data; //数据
public DelayedEle(long delay, String data) {
delayTime = delay;
this.data = data;
expire = System.currentTimeMillis() + delay;
}
/**
* 剩余时间=到期时间-当前时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 优先队列里面优先级规则
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DelayedElement{");
sb.append("delay=").append(delayTime);
sb.append(", expire=").append(expire);
sb.append(", data='").append(data).append('\'');
sb.append('}');
return sb.toString();
}
}
public static void main(String[] args) throws InterruptedException {
// 1 创建延时队列
DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();
// 2 创建延时任务
Random random = new Random();
for (int i = 0; i < 10; i++) {
DelayedEle ele = new DelayedEle(random.nextInt(500), "task-" + i);
delayQueue.offer(ele);
}
System.out.println("开始操作,delayQueue队列大小为:" + delayQueue.size());
// 3 依次取出任务并打印
DelayedEle delayedEle = null;
try {
// 3.1 循环,如果想避免虚假唤醒,则不能把全部元素都打印出来
for (; ; ) {
// 3.2 获取过期的任务并打印
while ((delayedEle = delayQueue.take()) != null) {
System.out.println(delayedEle.toString());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}首先创建延迟任务DelayedEle类,其中delayTime表示当前任务需要延迟多少ms时间过期,expire则是当前时间的ms值加上delayTime的值。
另外,实现了Delayed接口,实现了long getDelay(TimeUnit unit)方法用来获取当前元素还剩下多少时间过期,实现了int compareTo(Delayed o)方法用来决定优先级队列元素的比较规则。
在main函数内首先创建了一个延迟队列,然后使用随机数生成器生成了10个延迟任务,最后通过循环依次获取延迟任务,并打印。运行上面代码,一个可能的输出如下所示。

可见,出队的顺序和delay时间有关,而与创建任务的顺序无关。
插入元素到队列,如果插入元素为null则抛出NullPointerException异常,否则由于是无界队列,所以一直返回true。插入元素要实现Delayed接口。
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock; // 1
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {// 2
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 1 获取但不移除队首元素
E first = q.peek();
if (first == null)
available.await(); // 2
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 3
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null) // 4
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; // 5
try {
available.awaitNanos(delay); // 6
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // 7
available.signal();
lock.unlock();
}
}获取并移除队头过期元素,如果没有过期元素则返回null。
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 如果队列为空,或者不为空但是对头元素没有过期,则返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}首先获取独占锁,然后获取队头元素,如果队头元素为null或者还没过期则返回null,否则返回队头元素。
计算队列元素个数,包含过期的和没有过期的。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}先获取独占锁,然后调用优先级队列的size方法。
DelayQueue队列内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。

另外队列里面的元素要实现Delayed接口,其中一个是获取当前元素到过期时间剩余时间的接口,在出队时判断元素是否过期了,一个是元素之间比较的接口,因为这是一个有优先级的队列。