首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Java并发编程之阻塞队列

Java并发编程之阻塞队列

作者头像
Java微观世界
发布2025-01-21 08:05:30
发布2025-01-21 08:05:30
1660
举报
文章被收录于专栏:springbootspringboot

1、概念

队列

队列就可以想成是一个数组,从一头进入,一头出去,排队买饭

阻塞队列

BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

  • 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

为什么需要BlockingQueue

  • 在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又需要被唤醒
  • 使用BlockingQueue好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了

2、BlockingQueue接口实现

BlockingQueue阻塞队列是属于一个接口,底下有七个实现类

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

3、BlockingQueue核心方法

3.1、阻塞队列api之抛异常

代码语言:javascript
复制
public class BlockingQueueExceptionDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        //往队列中塞元素
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));

        try {
            //抛出 java.lang.IllegalStateException: Queue full
            System.out.println(blockingQueue.add("XXX"));
        } catch (Exception e) {
            System.err.println(e);
        }

        //检查队列是否有值,并返回队列第一个元素
        System.out.println(blockingQueue.element());

        //从队列中取元素
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());

        try {
            //抛出 java.util.NoSuchElementException
            System.out.println(blockingQueue.remove());
        } catch (Exception e) {
            System.err.println(e);
        }
    }
}

运行结果:

代码语言:javascript
复制
Connected to the target VM, address: '127.0.0.1:59946', transport: 'socket'
true
true
true
java.lang.IllegalStateException: Queue full
a
a
b
c
java.util.NoSuchElementException
Disconnected from the target VM, address: '127.0.0.1:59946', transport: 'socket'

Process finished with exit code 0

3.2、阻塞队列api之返回布尔值

代码语言:javascript
复制
public class BlockingQueueBooleanDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
		//往队列中塞元素
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("x"));

        //检查是否有值,并返回队列第一个元素
        System.out.println(blockingQueue.peek());
        
		//从队列中取元素
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
}

运行结果:

代码语言:javascript
复制
true
true
true
false
a
a
b
c
null

Process finished with exit code 0

3.3、阻塞队列api之阻塞

代码语言:javascript
复制
public class BlockingQueueBlockedDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        new Thread(()->{
            try {
                blockingQueue.put("a");
                blockingQueue.put("b");
                blockingQueue.put("c");
                blockingQueue.put("x");//将会阻塞,直到主线程take()
                System.out.println("it was blocked.");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        try {

            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();

            System.out.println("Blocking...");
            blockingQueue.take();//最终将会阻塞在这里

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

代码语言:javascript
复制
it was blocked.
Blocking...

Process finished with exit code 130

3.4、阻塞队列api之超时控制

代码语言:javascript
复制
public class BlockingQueueTimeoutDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println("Offer.");
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));

        System.out.println("Poll.");
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
    }
}

运行结果:

代码语言:javascript
复制
Offer.
true
true
true
false
Poll.
a
b
c
null

Process finished with exit code 0

4、SynchronousQueue阻塞队列

  • SynchronousQueue没有容量
  • 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue
  • 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
代码语言:javascript
复制
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put A ");
                blockingQueue.put("A");

                System.out.println(Thread.currentThread().getName() + "\t put B ");
                blockingQueue.put("B");

                System.out.println(Thread.currentThread().getName() + "\t put C ");
                blockingQueue.put("C");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }
}

运行结果:

代码语言:javascript
复制
t1	 put A 
t2	 take A 
t1	 put B 
t2	 take B 
t1	 put C 
t2	 take C 

Process finished with exit code 0
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-01-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、概念
  • 2、BlockingQueue接口实现
  • 3、BlockingQueue核心方法
    • 3.1、阻塞队列api之抛异常
    • 3.2、阻塞队列api之返回布尔值
    • 3.3、阻塞队列api之阻塞
    • 3.4、阻塞队列api之超时控制
  • 4、SynchronousQueue阻塞队列
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档