首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用多个线程(在一个池中)访问BlockingQueue不能像预期的那样并行工作

使用多个线程(在一个池中)访问BlockingQueue不能像预期的那样并行工作
EN

Stack Overflow用户
提问于 2020-11-19 19:01:28
回答 1查看 314关注 0票数 0

所以我有一个阻塞队列实现。其中一个Schedular是将一个随机数放入队列,延迟1秒,我实现了另一个包含10个线程池的Schedular,以便从消息队列中调用take()。

重要的是我正在尝试实现的场景是,在从队列中取出单个项目后,线程等待20秒(线程睡眠),我的理解是线程池中的其他9个线程将开始并行工作,而第一个线程等待20秒(其他线程也将等待20秒),但事实并非如此。池中的其他线程似乎根本没有启动。我是BlockingQueues的新手,任何帮助都将不胜感激。

我的代码如下。

公共类BlockingQueueImpl {

代码语言:javascript
运行
复制
public Queue<Integer> messageQueue = new ConcurrentLinkedDeque();

private void putNumber(Integer number){
   try{
       System.out.println("putting number to the queue: " + number);
       messageQueue.add(number);
       System.out.println("size of the queue: " +messageQueue.size());

   } catch (Exception e){
       e.printStackTrace();
   }
}

private void getNumber(){

}


private class RunnableGetImpl implements Runnable {

    @Override
    public void run() {
        try{
            Integer num = messageQueue.poll();
            System.out.println("Polling from queue, number - "+ num);
            if(num!=null){
                System.out.println("Sleeping thread for 20 sec"+Thread.activeCount());
                Thread.sleep(20000);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


private class RunnablePutImpl implements Runnable {

    @Override
    public void run() {
        Random rand = new Random();
        int n = rand.nextInt(100);
        n += 1;
        putNumber(n);


    }


}

public static void main(String[] args){

    BlockingQueueImpl blockingQueue = new BlockingQueueImpl();


    ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1);
    executor1.scheduleAtFixedRate(blockingQueue.new RunnablePutImpl(), 0, 1000, TimeUnit.MILLISECONDS);

    ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(20);
    executor2.scheduleAtFixedRate(blockingQueue.new RunnableGetImpl(), 0, 100, TimeUnit.MILLISECONDS);

}

}

EN

回答 1

Stack Overflow用户

发布于 2020-11-19 20:45:40

来自ScheduledThreadPoolExecutor.scheduleAtFixedRate的JavaDoc

如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。

因此,您需要启动(安排)尽可能多的工作进程。

在寻找更好的解决方案时,请注意您实际上并未使用 BlockingQueue

您没有实现java.util.concurrent.Blockingqueue,也没有使用它的实现。ConcurrentLinkedDeque只是一个集合,它甚至没有实现Queue

如果队列为空,ConcurrentLinkedDeque.poll()不会阻塞,只会返回null

下面是BlockingQueue接口的JavaDocs:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

使用put()为队列提供一个值。如果BlockingQueue已达到其最大容量,则该操作将被阻止。使用take()删除元素。如果队列为空,这将阻塞。

正确使用这些类将提高应用程序的性能,因为您不会一直轮询某个值。

关于类似问题的答案中有更多详细信息:How to use ConcurrentLinkedQueue?

更新:具有多个生产者/消费者的示例代码

下面的示例代码是从https://riptutorial.com/java/example/13011/multiple-producer-consumer-example-with-shared-global-queue复制的,我和它没有任何从属关系:

下面的代码展示了多个生产者/消费者程序。生产者线程和消费者线程共享相同的全局队列。

代码语言:javascript
运行
复制
import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

输出:

代码语言:javascript
运行
复制
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

请注意如何将多个生产者和消费者添加到池中-您需要尽可能多的生产者和消费者来潜在地并行工作。这是你的代码缺少的关键东西--多个工作进程。调度器将对它们进行调度,但它不会神奇地将您要求它调度的单个实例相乘。

显然,您需要根据自己的需求调整生产者和消费者的数量。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64910329

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档