前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文让你了解java生产消费者模型

一文让你了解java生产消费者模型

作者头像
老马的编程之旅
发布2022-06-22 13:22:26
4330
发布2022-06-22 13:22:26
举报
文章被收录于专栏:深入理解Android

在面试中,多线程问题中,可能会让手写生产消费者模型,所以本篇我们就来讲解一下。

所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域

如果共享数据区已满的话,阻塞生产者继续生产数据放置入内; 如果共享数据区为空的话,阻塞消费者继续消费数据;

在实现生产者消费者问题时,可以采用三种方式: 1.使用Object的wait/notify的消息通知机制; 2.使用Lock的Condition的await/signal的消息通知机制; 3.使用BlockingQueue实现。

wait/notify

Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。在线程中调用 wait() 方法,将阻塞当前线程,直至等到其他线程调用了调用 notify() 方法或 notifyAll() 方法进行通知之后,当前线程才能从wait()方法出返回

1.wait 该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。

2.notify 该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException。 该方法任意从WAITTING状态的线程中挑选一个进行通知,使得调用wait()方法的线程从等待队列移入到同步队列中,等待有机会再一次获取到锁,从而使得调用wait()方法的线程能够从wait()方法处退出。调用notify后,当前线程不会马上释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。

3.notifyAll 该方法与 notify ()方法的工作方式相同,重要的一点差异是: notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。

wait/notify存在的一些问题

1.notify通知过早 假设有两个线程A,B,如果A先运行进行notify ,B再运行进行wait(),则B会永远陷入等待,A通知太早了

代码语言:javascript
复制
public class EarlyNotify {

    private static String lockObject = "";

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + "  进去代码块");
                    System.out.println(Thread.currentThread().getName() + "  开始wait");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + "   结束wait");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  进去代码块");
                System.out.println(Thread.currentThread().getName() + "  开始notify");
                lock.notify();
                System.out.println(Thread.currentThread().getName() + "   结束开始notify");
            }
        }
    }
}

NotifyThread会先启动,先调用notify方法。然后WaitThread线程才启动,调用wait方法,但是由于通知过了,wait方法就无法再获取到相应的通知,因此WaitThread会一直在wait方法出阻塞,这种现象就是通知过早的现象。针对这种现象,解决方法,一般是添加一个状态标志,让waitThread调用wait方法前先判断状态是否已经改变了没,如果通知早已发出的话,WaitThread就不再去wait。

代码语言:javascript
复制
public class EarlyNotify {

    private static String lockObject = "";
    private static boolean isWait = true;

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    while (isWait) {
                        System.out.println(Thread.currentThread().getName() + "  进去代码块");
                        System.out.println(Thread.currentThread().getName() + "  开始wait");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "   结束wait");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  进去代码块");
                System.out.println(Thread.currentThread().getName() + "  开始notify");
                lock.notifyAll();
                isWait = false;
                System.out.println(Thread.currentThread().getName() + "   结束开始notify");
            }
        }
    }
}

总结:在使用线程的等待/通知机制时,一般都要配合一个 boolean 变量值(或者其他能够判断真假的条件),在 notify 之前改变该 boolean 变量的值,让 wait 返回后能够退出 while 循环(一般都要在 wait 方法外围加一层 while 循环,以防止早期通知),或在通知被遗漏后,不会被阻塞在 wait 方法处。这样便保证了程序的正确性。

  1. “假死”状态 这种情况就是如果存在多个生产者和多个消费者,比如我们有个生产者,A,B,C,他们共同向List添加数据,当List满的话,就进行wait,假设当其中一个生产者A获取锁时候,其他B,C都处于wait状态,在wait/notify模型中,notify是随机唤醒一个等待线程的,A调用notify可能还是唤醒的生产者线程,就会造成所有的生产者线程都处于等待状态。
代码语言:javascript
复制
  static class Product implements Runnable {
        List<Integer> mList;
        private final int mMaxLength;

        public Product(List list, int maxLength) {
            mList = list;
            mMaxLength = maxLength;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (mList) {
                    while (mList.size() == mMaxLength) {
                        try {
                            Log.d("Product", Thread.currentThread() + "已经达到最大生产数量 ");
                            Log.d("Product", Thread.currentThread() + " wait");
                            wait();
                            Log.d("Product", Thread.currentThread() + " 退出 wait");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    Log.d("Product", Thread.currentThread() + "添加对象:" + i);
                    mList.add(i);
                    mList.notify();
                }
            }
        }
    }

如上面的代码,假设生产者A将mList添加满,然后调用notify(),有可能唤醒了等待的生产者B,这样B会进入mList.size() == mMaxLength判断,会进行wait,跳不出循环,导致所有的生产者线程都进入了等待。

总结

在Object提供的消息通知机制应该遵循如下这些条件: 1.永远在while循环中对条件进行判断而不是if语句中进行wait条件的判断; 2.使用NotifyAll而不是使用notify。

wait/notifyAll实现生产者-消费者

利用wait/notifyAll实现生产者和消费者代码如下:

代码语言:javascript
复制
 LinkedList<Integer> linkedList = new LinkedList();
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            executorService.submit(new Product(linkedList, 15));
        }

        for (int i = 0; i < 10; i++) {
            executorService.submit(new Consumer(linkedList));
        }
代码语言:javascript
复制
 static class Consumer implements Runnable {
        List<Integer> mList;

        public Consumer(List<Integer> list) {
            mList = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (mList) {
                    while (mList.size() == 0) {

                        try {
                            Log.d("Consumer", Thread.currentThread() + "已经消费完毕 ");
                            Log.d("Consumer", Thread.currentThread() + " wait");
                            wait();
                            Log.d("Consumer", Thread.currentThread() + " 退出 wait");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Integer i = mList.remove(0);
                        Log.d("Product", Thread.currentThread() + "消费对象:" + i);
                    }
                }
            }
        }
    }
代码语言:javascript
复制
static class Product implements Runnable {
        List<Integer> mList;
        private final int mMaxLength;

        public Product(List list, int maxLength) {
            mList = list;
            mMaxLength = maxLength;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (mList) {
                    while (mList.size() == mMaxLength) {
                        try {
                            Log.d("Product", Thread.currentThread() + "已经达到最大生产数量 ");
                            Log.d("Product", Thread.currentThread() + " wait");
                            wait();
                            Log.d("Product", Thread.currentThread() + " 退出 wait");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    Log.d("Product", Thread.currentThread() + "添加对象:" + i);
                    mList.add(i);
                    mList.notify();
                }
            }
        }
    }

使用Lock中Condition的await/signalAll实现生产者-消费者

参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法: 1.相对于wait方法 void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;

2.相对于notify方法 void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。

void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程

使用await/signalAll的demo代码

代码语言:javascript
复制
public class ProductorConsumer {

private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();

public static void main(String[] args) {
    LinkedList linkedList = new LinkedList();
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i < 5; i++) {
        service.submit(new Productor(linkedList, 8, lock));
    }
    for (int i = 0; i < 10; i++) {
        service.submit(new Consumer(linkedList, lock));
    }

}

static class Productor implements Runnable {

    private List<Integer> list;
    private int maxLength;
    private Lock lock;

    public Productor(List list, int maxLength, Lock lock) {
        this.list = list;
        this.maxLength = maxLength;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.size() == maxLength) {
                    System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                    full.await();
                    System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                list.add(i);
                empty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}


static class Consumer implements Runnable {

    private List<Integer> list;
    private Lock lock;

    public Consumer(List list, Lock lock) {
        this.list = list;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.isEmpty()) {
                    System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                    empty.await();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                }
                Integer element = list.remove(0);
                System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                full.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

}

使用BlockingQueue实现生产者-消费者

由于BlockingQueue内部实现就附加了两个阻塞操作。即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止。

BlockingQueue实现生产者-消费者为题,阻塞队列完全可以充当共享数据区域,就可以很好的完成生产者和消费者线程之间的协作。

BlockingQueue实现生产者-消费者代码

代码语言:javascript
复制
public class ProductorConsumer {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(queue));
        }
    }


    static class Productor implements Runnable {

        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) queue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

使用BlockingQueue来实现生产者-消费者代码可以看出来是最整洁的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-07-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • wait/notify
  • wait/notify存在的一些问题
    • 总结
    • wait/notifyAll实现生产者-消费者
    • 使用Lock中Condition的await/signalAll实现生产者-消费者
    • 使用BlockingQueue实现生产者-消费者
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档