前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java多线程六脉神剑-关冲剑(Condition)、中冲剑(BlockingQueue)

Java多线程六脉神剑-关冲剑(Condition)、中冲剑(BlockingQueue)

原创
作者头像
BLACK595
发布2024-09-27 20:31:20
850
发布2024-09-27 20:31:20

Condition(关冲剑)

关冲剑:关冲剑拙滞古朴,Condition 常用于线程间的条件等待,以相对基础和传统的方式实现线程间的协调。

使用synchronized和Object的wait()和notify()可以实现等待/通知模型,然而使用这种方式,JVM会随机唤醒一个等待的线程。而使用Condition,可以将线程进行更精细的分组管理。通过创建多个Condition对象,可以将不同需求的线程等待在不同的Condition上。在唤醒时,可以精确地选择唤醒特定Condition上等待的线程,而不是像notify()那样随机唤醒或者notifyAll那样全部唤醒。

因而使用Condition的前提必须是获取到Lock锁,就像使用Object.wait()时必须需要先获取到Synchronized一样。

Condition方法详解

  • await():线程等待,直到被唤醒或中断,等待的过程中会释放Lock锁,类似于Object.wait()
  • await(long time, TimeUnit unit):线程等待指定的时间,或被唤醒或被中断,类似于Object.wait(long timeoutMillis)
  • awaitNanos(long nanosTimeout):线程等待指定纳秒的时间,或被唤醒或被中断,类似于Object.wait(long timeoutMillis, int nanos)
  • awaitUntil(Date deadline):线程等待直到指定的截止日期,或被唤醒,或被中断。
  • awaitUninterruptibly():线程等待直到被唤醒,即使在等待时被中断也不会返回。
  • signal():唤醒同Condition下一个等待的线程。类似于 Object.notify()
  • signalAll():唤醒同Condition下所有等待的线程。类似于 Object.notifyAll()
代码语言:txt
复制
1ms【毫秒】 = 1000μs【微秒】    1μs【微秒】 = 1000ns【纳秒】

举个栗子🌰

今典的生产者消费者问题,消费者在消费之前必须要等待生产者生产,在业务中生产消费可能还会有一些前提,比如要某某数据拿到了才能生产,这里我们用两个Ready方法模拟生产条件是否满足。其次,如果消费者先拿到锁,然而生产者还没生产数据,消费者将会阻塞等待。

代码如下:

代码语言:java
复制
public class ConditionCase {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private Object object = null;
    private boolean produced = false; //是否已经生产
    private boolean consumed = false; //是否已经消费

    public void producer() {
        lock.lock();
        try {
            // 假设生产条件未满足 或 还未进行消费,等待
            while (!isProductionReady() || consumed) {
                condition.await();
            }
            // 进行生产操作
            object = new Object();
            produced = true;
            System.out.println("生产完成,可以消费");
            // 通知消费者可以消费
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consumer() {
        lock.lock();
        try {
            // 假设消费条件未满足,等待
            while (!isConsumptionReady() ||!produced) {
                condition.await();
            }
            // 进行消费操作
            object = null;
            consumed = true;
            System.out.println("消费完成");
            // 通知生产者可以生产
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private boolean isProductionReady() {
        // 这里可以根据实际情况判断生产条件是否满足
        return true;
    }

    private boolean isConsumptionReady() {
        // 这里可以根据实际情况判断消费条件是否满足
        return true;
    }

    public static void main(String[] args) {
        ConditionCase conditionCase = new ConditionCase();

        new Thread(() -> conditionCase.consumer()).start();
        new Thread(() -> conditionCase.producer()).start();
    }
}

执行结果:

代码语言:txt
复制
生产完成,可以消费
消费完成

BlockingQueue(中冲剑)

中冲剑:中冲剑气势雄迈,BlockingQueue 在多线程数据交换中起着重要的作用,其容量和阻塞特性就像中冲剑大开大阖。

BlockingQueue(阻塞队列),在多线程中使用的队列,具有普通队列存放元素的特性。同时提供两个特性方法,put和take,当队列满时,生产者线程无法放进元素,会一直等待直到队列中元素被消费;当队列为空时,消费者线程获取元素时获取不到会一直等待,直到队列可用。

BlockingQueue常用方法

作用

一直阻塞

阻塞超时

返回特殊值

抛异常

插入

put(E e)

offer(E e, long timeout, TimeUnit unit)

offer(E e)

add( E e)

删除

take()

poll(long timeout, TimeUnit unit)

poll()

remove()

获取头部

-

-

peek()

element()

  • 一直阻塞:当队列中元素放不进去或取不出来时,插入和删除的线程将会一直阻塞等待,直到队列可用。
  • 阻塞超时:当线程阻塞时,给设置一个超时时间,在超时时间内阻塞还没恢复,offer()将返回false,poll()将返回null。
  • 返回特殊值:如果插入成功将返回true,插入失败返回false。从头部删除一个元素,删除失败返回null。从头部获取元素,队列为空返回null。
  • 抛异常:当队列中满,元素放不进去时,将会抛出IllegalStateException: Queue full;当队列空,无法取出元素时,抛出NoSuchElementException异常。

举个栗子🌰

组装加工一台电脑,我们需要加工CPU、主板、内存、显卡、电源等部件,每个部件难易程度不同,加工时间也不相同,假设我们派两个人加工,当一个人加工完一个部件后,立马去加工还没完成的,这样就能充分利用每个人的效能,最快完成全部部件的加工。

我们将全部部件加工的任务放到阻塞队列中,两个线程来领取任务,如果能够领取到,代表任务还没做完,就消费队列中的任务。

代码实现:

代码语言:java
复制
public class BlockingQueueCase {
    //初始任务队列
    static BlockingQueue<Task> blockingQueue;
    public static void produce(List<Task> taskList) throws InterruptedException {
        blockingQueue = new ArrayBlockingQueue<>(taskList.size());
        //将任务放到任务队列中
        for (Task task : taskList) {
            blockingQueue.put(task);
        }
    }

    /**
     * 消费任务
     * @param consumerNum 消费者数量
     */
    public static void consume(int consumerNum, CountDownLatch latch){
        for(int i = 1; i <= consumerNum; i++){
            //消费者消费任务
            Thread thread = new Thread(() -> {
                Task task;
                //当任务队列中还有需要执行的任务
                while((task = blockingQueue.poll()) != null){
                    //业务执行
                    try {
                        Thread.sleep(task.getTime());
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println(Thread.currentThread().getName() + "完成任务:"+task.getName()+",耗时:"+task.getTime()+"毫秒");
                    latch.countDown();
                }
            },"线程"+i);
            thread.start();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //模拟5个待执行任务
        List<Task> taskList = new ArrayList<>();
        taskList.add(new Task("CPU",2000L));
        taskList.add(new Task("主板",800L));
        taskList.add(new Task("内存",500L));
        taskList.add(new Task("显卡",1000L));
        taskList.add(new Task("电源",200L));
        //5个任务放到队列中
        produce(taskList);
        //latch用于主线程等待线程把任务都执行完毕
        CountDownLatch latch = new CountDownLatch(taskList.size());
        //2个消费者消费任务
        consume(2,latch);
        latch.await();
        System.out.println("任务都执行完毕");
    }
}

执行结果:

常用队列实现类

  1. ArrayBlockingQueue:数组实现的有界阻塞队列,按照先入先出(FIFO)排序(先进队列的元素先被消费),在队列初始化时,设置数组长度并可以指定公平锁或非公平锁(非公平锁:先等待消费的线程不一定先拿到锁)。
  2. LinkedBlockingQueue:链表实现的有界队列,按照先入先出(FIFO)排序,队列大小默认为Integer.MAX_VALUE。
  3. PriorityBlockingQueue:支持排序的优先无界队列,默认初始容量为11,当队列使用达到一定比例(通常75%),会进行扩容,扩容后容量变为之前的2倍。在初始化时我们可以指定初始容量,并可以实现Comparator指定元素排序规则。
  4. SynchronousQueue:内部不存储元素,它的容量为0。每一个插入操作必须等待一个取出操作,反之亦然,也就是插入线程必须等待一个取出线程。在线程池中,如果希望创建任务后立即执行,而不需要在等待队列中排队,就可以使用SynchronousQueue。
  5. DelayQueue:基于使用PriorityQueue的无界阻塞队列,用于存放实现了Delayed接口的对象,元素只有在其指定的延迟时间到期后才能从队列中取出。按照延迟时间的长短对元素进行排序,延迟时间最短的元素排在队列头部。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Condition(关冲剑)
  • Condition方法详解
  • 举个栗子🌰
  • BlockingQueue(中冲剑)
  • BlockingQueue常用方法
  • 举个栗子🌰
  • 常用队列实现类
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档