其实具体业务无所谓,这次解决的问题是“统计、累加类业务类型”,这里的业务就用”统计动物园中所有种类动物数量的总和”,类比代替了。
我要写一个接口,吐出 “动物园所有种类动物的总和”。已知目前有 15种动物,现在有现成的查询每种动物数量的接口,每种动物都要调用RPC接口去别的系统查询。且耗时较高。
根据上面的描述,线性去查询,调用15次RPC接口,时间花费巨大,所以放弃单线程模式。打算使用多线程的方法,进来请求后,分发 15个线程去查每一种动物的数据,返回结果。用多线程的话,在项目中肯定首先考虑使用线程池。
<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="15" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="30" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="30" />
<!-- 线程池维护线程所允许的空闲时间 默认为60s-->
<property name="keepAliveSeconds" value="180" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
ThreadPoolTaskExecutor 是Spring 对JUC包内的ThreadPoolExecutor上的封装,能配置Bean,注入SpringIOC 容器中,交给Spring管理
或者springBoot:
@Configuration
@EnableAsync
public class AsyncConfig {
public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name = ASYNC_EXECUTOR_NAME)
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new CopyingDecorator());
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
/**
* 查询数量使用的线程池
*/
@Autowired
@Qualifier("threadPool")
private ThreadPoolTaskExecutor threadPool;
public long getAllAnimalCount(int accountType, String account) {
try {
// 初始化返回结果
AtomicLong resultValue = new AtomicLong(0);
// 获取所有的动物类型
AllTypeEnum[] enumValues = AllTypeEnum.values();
// 开启倒计时协调器
CountDownLatch countDownLatch = new CountDownLatch(enumValues .length);
// 用线程池分发线程分配处理每一个类型
for (AllTypeEnum tempEnum : enumValues ) {
threadPool.execute(new AnimalCountThread(account, accountType, tempEnum.getType(), resultValue, countDownLatch));
}
// 等所有线程都处理完之后再拿返回结果
countDownLatch.await();
return resultValue.get();
} catch (InterruptedException e1) {
log.error("出现线程中断异常", e1);
} catch (Exception e2) {
log.error("出现未知异常", e2);
}
return 0;
}
/**
* 查询动物数量线程
*
* @author XXX
* @date 2020/05/14
*/
@Data
@Slf4j
public class AnimalCountThread implements Runnable {
/**
* 账号
*/
private String account;
/**
* 账户类型
*/
private int accountType;
/**
* 动物类型 来自枚举
*/
private int type;
/**
* 累加的目标值
*/
private AtomicLong targetValue;
/**
* 栅栏
*/
private CountDownLatch countDownLatch;
/**
* 构造函数
*
* @param account 账号
* @param accountType 账号类型
* @param type 动物类型
* @param targetValue 累加的目标值
* @param countDownLatch 栅栏
*/
public AnimalCountThread (String account, int accountType, int type, AtomicLong targetValue, CountDownLatch countDownLatch) {
this.account = account;
this.accountType = accountType;
this.type = type;
this.targetValue = targetValue;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
if (typeEnum != null) {
//获取具体业务Bean
CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());
targetValue.getAndAdd(num);
}
}catch (Exception e) {
log.error("线程执行出现异常",e);
} finally {
countDownLatch.countDown();
}
}
}
1、线程中是无法直接使用注解注入JavaBean的,所以我从Spring容器里拿的。或者也可以不定义这个线程,使用匿名内部类的方法。
2、累计的目标值,直接使用 AtomicLong 省得自己去同步。
3、用CountDownLatch 等所有线程都处理完,主线程再拿返回结果。
4、CountDownLatch 在子线程中,一定要保证被调用到 countDown()。
5、线程池配置拒绝策略,另外三种都丢弃了任务,所以用交给主线程的这种方法比较适合当前业务。
6、线程池的配置队列长度:要是追求性能的话不能过长。越长耗时越长,接口性能越差。
7、接口最外层要合理使用缓存,缓解压力,在对外RPC接口出还可以配置限流。 由于运用了多线程,快进快出, 限流是为了减小峰值。快进快出的话即使限流。 吞吐量也会比不用“多线程”大。
8、一定要压测一下,对于线程池的配置,也可以根据压测结果,调配。
上面的实现方式,由于线程实例是实现Runable接口的方式,Runable run() 方法没有返回值的原因,所以用了公共的参数,AtomicLong 在线程内部累计计算的结果。而且用了CountDownLatch 进行同步操作,来保证主线程获取结果时,所有子任务处理完毕。
如果我们用其他方式时可以不用这两步。
先说线程池 +Callable + Future的方式。
一、Callable接口是jdk 1.4 以后提供的,能返回值,并且能抛异常。 public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } Callable一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本: <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); 因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。 二、Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
同上
/**
* 查询数量使用的线程池
*/
@Autowired
@Qualifier("threadPool")
private ThreadPoolTaskExecutor threadPool;
@Override
public long getAllAnimalCount(int accountType, String account) {
try {
// 初始化返回结果
Long resultValue = 0L;
// 获取所有的预警类型
AllTypeEnum[] enumValues = AllTypeEnum.values();
// 初始化,Future结果容器
List<Future<Long>> futureList = new ArrayList<>();
// 分发任务
for (AllTypeEnum tempEnum : enumValues) {
Future<Long> tempResult = threadPool.submit(new AnimalCountTask (account, accountType, tempEnum.getType()));
futureList.add(tempResult);
}
// 获取所有结果
for (Future<Long> tempFuture : futureList) {
try {
if (tempFuture.get() != null) {
resultValue += tempFuture.get(); // 会阻塞,直到这个任务执行完毕。
}
} catch (Exception e) {
log.error("getAllAnimalCount,线程执行出现异常", e);
}
}
return resultValue;
} catch (Exception e) {
log.error("[getAllAnimalCount]出现异常", e);
}
return 0;
}
/**
* 查询动物数量任务 Callable版本
*
* @author XXX
* @date 2020/05/14
*/
@Data
@Slf4j
public class AnimalCountTask implements Callable<Long> {
/**
* 账号
*/
private String account;
/**
* 账户类型
*/
private int accountType;
/**
* 动物类型
*/
private int animalType;
public AnimalCountTask(String account, int accountType, int animalType) {
this.account = account;
this.accountType = accountType;
this.animalType= animalType;
}
@Override
public Long call() throws Exception {
AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
if (typeEnum != null) {
//获取具体业务Bean
CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());
return num;
}
}
return null;
}
}
这种方式的实现,可以看到 获取结果 resultValue += tempFuture.get(); 时会阻塞。循环获取的时候,假如你第二个任务用时最长,那他在for循环的第二次时候,等半天才接着处理其他的。
这个问题呢,可以优化。我想哪个子任务先做完,我就先获取那个子任务的结果,而不是傻傻的线性的一个任务一个任务的看。
JDK 8 提供了 CompletionService 具有这样的功能。它的实现类内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。这个类是为线程池中Task的执行结果服务的,即为Executor中Task返回Future而服务的。 CompletionService的实现目标是“任务先完成可优先获取到,按完成先后顺序排序” public interface CompletionService<V> { // 提交 Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); // 获取 Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
同上
/**
* 查询数量使用的线程池
*/
@Autowired
@Qualifier("threadPool")
private ThreadPoolTaskExecutor threadPool;
@Override
public long getAllAnimalCount(int accountType, String account) {
try {
// 初始化返回结果
Long resultValue = 0L;
// 获取所有的动物类型
AllTypeEnum[] enumValues = AllTypeEnum.values();
// 实例化 CompletionService
CompletionService<Long> completionService = new ExecutorCompletionService<>(threadPool);
// 用CompletionService提交分发任务
for (AllTypeEnum tempEnum : enumValues) {
completionService.submit(new AnimalCountTask(account, accountType, tempEnum.getType()));
}
// 拿取返回值并计算总和
for (AllTypeEnum tempEnum : enumValues ) {
try {
Long value = completionService.take().get();// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
if (value != null) {
resultValue += value;
}
} catch (Exception e) {
log.error("[getAllAnimalCount]线程执行出现异常", e);
}
}
return resultValue;
} catch (Exception e) {
log.error("[getAllAnimalCount]出现异常", e);
}
return 0;
}
同上Callable的实现
/**
* 查询动物数量任务 Callable版本
*
* @author XXX
* @date 2020/05/14
*/
@Data
@Slf4j
public class AnimalCountTask implements Callable<Long> {
/**
* 账号
*/
private String account;
/**
* 账户类型
*/
private int accountType;
/**
* 动物类型
*/
private int animalType;
public AnimalCountTask(String account, int accountType, int animalType) {
this.account = account;
this.accountType = accountType;
this.animalType= animalType;
}
@Override
public Long call() throws Exception {
AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
if (typeEnum != null) {
//获取具体业务Bean
CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());
return num;
}
}
return null;
}
}
说一下,Future.get()取结果时,为什么try catch异常。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。