前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程之queue

并发编程之queue

作者头像
lyb-geek
发布2018-03-27 15:03:45
8230
发布2018-03-27 15:03:45
举报
文章被收录于专栏:Linyb极客之路

一、什么是queue

队列是一种特殊的线性表,它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。

在队列这种数据结构中,最先插入的元素将是最先被删除的元素;反之最后插入的元素将是最后被删除的元素,因此队列又称为“先进先出”(FIFO—first in first out)的线性表。

Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。

二、Queue的实现

1、没有实现的阻塞接口的LinkedList: 实现了java.util.Queue接口和java.util.AbstractQueue接口

  内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue

  PriorityQueue 和 ConcurrentLinkedQueue 类在 Collection Framework 中加入两个具体集合实现。

  PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。

  ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

2、实现阻塞接口的:

  java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。

五个队列所提供的各有不同:

  * ArrayBlockingQueue :一个由数组支持的有界队列。

  * LinkedBlockingQueue :一个由链接节点支持的可选有界队列。

  * PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。

  * DelayQueue :一个由优先级堆支持的、基于时间的调度队列。

  * SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

三、队列的操作

  add()增加一个元索,如果队列已满,则抛出一个IIIegaISlabEepeplian异常

  remove(),移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常

  element(),返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常

  offer(),添加一个元素并返回true,如果队列已满,则返回false

  poll(),移除并返问队列头部的元素,如果队列为空,则返回null

  peek(),返回队列头部的元素,如果队列为空,则返回null

  put(),添加一个元素,如果队列满,则阻塞

  take(),移除并返回队列头部的元素,如果队列为空,则阻塞

其中remove、element、offer 、poll、peek 属于Queue接口。

阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。注意:poll和peek方法出错进返回null。因此,向队列中插入null值是不合法的。最后,我们有阻塞操作put和take。put方法在队列满时阻塞,take方法在队列空时阻塞。

四、常用的Queue实现类

1.非线程安全的

LinkedList

值得注意的是LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。

2.线程安全的

非阻塞队列

ConcurrentLinkedQueue

源码片段:

public E poll() {

restartFromHead:

for (;;) {

for (Node<E> h = head, p = h, q;;) {

E item = p.item;

if (item != null && p.casItem(item, null)) { // CAS算法

// Successful CAS is the linearization point

// for item to be removed from this queue.

if (p != h) // hop two nodes at a time

updateHead(h, ((q = p.next) != null) ? q : p);

return item;

}

else if ((q = p.next) == null) {

updateHead(h, p);

return null;

}

else if (p == q)

continue restartFromHead;

else

p = q;

}

}

}

ConcurrentLinkedQueue使用CAS算法保证线程安全,但它不支持阻塞。

阻塞队列

ArrayBlockingQueue

源码片段:

public E poll() {

final ReentrantLock lock = this.lock;// 这个类中共用同一个锁

lock.lock();

try {

return (count == 0) ? null : extract();

} finally {

lock.unlock();

}

}

在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue

代码片段:

public boolean offer(E e) {

if (e == null) throw new NullPointerException();

final AtomicInteger count = this.count;

if (count.get() == capacity)

return false;

int c = -1;

Node<E> node = new Node(e);

final ReentrantLock putLock = this.putLock;// 写锁

putLock.lock();

try {

if (count.get() < capacity) {

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

}

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

return c >= 0;

}

public E poll() {

final AtomicInteger count = this.count;

if (count.get() == 0)

return null;

E x = null;

int c = -1;

final ReentrantLock takeLock = this.takeLock;// 读锁

takeLock.lock();

try {

if (count.get() > 0) {

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

}

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

基于链表的阻塞队列,因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

但值得注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue与LinkedBlockingQueue的不同之处:

a、锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁,由此也意味着两者无法真正并行运行。

LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock。

b、内部元素操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的,即不会产生任何额外的对象实例。

LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,这在长时间内需要高效并发地处理大批量数据的系统中,对GC和性能会有一定影响。

c、队列初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小。

LinkedBlockingQueue实现的队列中可以不指定队列的大小,默认是Integer.MAX_VALUE。

PriorityBlockingQueue

代码片段:

public boolean offer(E e) {

if (e == null)

throw new NullPointerException();

final ReentrantLock lock = this.lock;

lock.lock();

int n, cap;

Object[] array;

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap);

try {

Comparator<? super E> cmp = comparator;

if (cmp == null)

siftUpComparable(n, e, array);

else

siftUpUsingComparator(n, e, array, cmp);// 支持优先级排序

size = n + 1;

notEmpty.signal();

} finally {

lock.unlock();

}

return true;

}

PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力,里面存储的对象必须是实现Comparable接口。队列通过这个接口的compare方法确定对象的priority。

规则是:当前和其他对象比较,如果compare方法返回负数,那么在队列里面的优先级就比较高。

值得注意的是: 如果将PriorityBlockingQueue队列中的全部元素循环打印出来,你会发现里面的元素并不是全部按优先级排序的,但是队列头部元素的优先级肯定是最高的。

DelayQueue

源码片段:

public boolean offer(E e) {

final ReentrantLock lock = this.lock;

lock.lock();

try {

q.offer(e);

if (q.peek() == e) {

leader = null;

available.signal();

}

return true;

} finally {

lock.unlock();

}

}

DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。里面存储的对象必须是实现Delayed 接口。

五、队列常用场景

符合生产者-消费者模型的都可以使用队列。

另当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。

LinkedBlockingQueue 多用于任务队列(单线程发布任务,任务满了就停止等待阻塞,当任务被完成消费少了又开始负载 发布任务)

ConcurrentLinkedQueue 多用于消息队列(多个线程发送消息,先随便发来,不计并发的-cas特点)

单生产者,单消费者 用 LinkedBlockingqueue

多生产者,单消费者 用 LinkedBlockingqueue

单生产者 ,多消费者 用 ConcurrentLinkedQueue

多生产者 ,多消费者 用 ConcurrentLinkedQueue

DelayQueue 这个队列适用用来重试机制实现;缓存系统的设计,缓存中的对象,超过了空闲时间,需要从缓存中移出;任务调度系统,能够准确的把握任务的执行时间。

六、小结

在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。

另外他们都是线程安全(除了linkedList)的,不用考虑线程同步问题。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-02-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Linyb极客之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档