所以我有一个阻塞队列实现。其中一个Schedular是将一个随机数放入队列,延迟1秒,我实现了另一个包含10个线程池的Schedular,以便从消息队列中调用take()。
重要的是我正在尝试实现的场景是,在从队列中取出单个项目后,线程等待20秒(线程睡眠),我的理解是线程池中的其他9个线程将开始并行工作,而第一个线程等待20秒(其他线程也将等待20秒),但事实并非如此。池中的其他线程似乎根本没有启动。我是BlockingQueues的新手,任何帮助都将不胜感激。
我的代码如下。
公共类BlockingQueueImpl {
public Queue<Integer> messageQueue = new ConcurrentLinkedDeque();
private void putNumber(Integer number){
try{
System.out.println("putting number to the queue: " + number);
messageQueue.add(number);
System.out.println("size of the queue: " +messageQueue.size());
} catch (Exception e){
e.printStackTrace();
}
}
private void getNumber(){
}
private class RunnableGetImpl implements Runnable {
@Override
public void run() {
try{
Integer num = messageQueue.poll();
System.out.println("Polling from queue, number - "+ num);
if(num!=null){
System.out.println("Sleeping thread for 20 sec"+Thread.activeCount());
Thread.sleep(20000);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
private class RunnablePutImpl implements Runnable {
@Override
public void run() {
Random rand = new Random();
int n = rand.nextInt(100);
n += 1;
putNumber(n);
}
}
public static void main(String[] args){
BlockingQueueImpl blockingQueue = new BlockingQueueImpl();
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1);
executor1.scheduleAtFixedRate(blockingQueue.new RunnablePutImpl(), 0, 1000, TimeUnit.MILLISECONDS);
ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(20);
executor2.scheduleAtFixedRate(blockingQueue.new RunnableGetImpl(), 0, 100, TimeUnit.MILLISECONDS);
}}
发布于 2020-11-19 20:45:40
来自ScheduledThreadPoolExecutor.scheduleAtFixedRate的JavaDoc
如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
因此,您需要启动(安排)尽可能多的工作进程。
在寻找更好的解决方案时,请注意您实际上并未使用 BlockingQueue
您没有实现java.util.concurrent.Blockingqueue,也没有使用它的实现。ConcurrentLinkedDeque只是一个集合,它甚至没有实现Queue。
如果队列为空,ConcurrentLinkedDeque.poll()不会阻塞,只会返回null。
下面是BlockingQueue接口的JavaDocs:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
使用put()为队列提供一个值。如果BlockingQueue已达到其最大容量,则该操作将被阻止。使用take()删除元素。如果队列为空,这将阻塞。
正确使用这些类将提高应用程序的性能,因为您不会一直轮询某个值。
关于类似问题的答案中有更多详细信息:How to use ConcurrentLinkedQueue?
更新:具有多个生产者/消费者的示例代码
下面的示例代码是从https://riptutorial.com/java/example/13011/multiple-producer-consumer-example-with-shared-global-queue复制的,我和它没有任何从属关系:
下面的代码展示了多个生产者/消费者程序。生产者线程和消费者线程共享相同的全局队列。
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}输出:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1请注意如何将多个生产者和消费者添加到池中-您需要尽可能多的生产者和消费者来潜在地并行工作。这是你的代码缺少的关键东西--多个工作进程。调度器将对它们进行调度,但它不会神奇地将您要求它调度的单个实例相乘。
显然,您需要根据自己的需求调整生产者和消费者的数量。
https://stackoverflow.com/questions/64910329
复制相似问题