线程治理最重要的是线程池了,之前我讲过,但是,还有两大法宝就是future 和 callable
1、不能返回值,子线程去做任务的时候,它是不会返回结果的
也就是run方法,在runnable接口里是void返回值,我们重写了之后,也是不可以修改的
2、不能抛出checked Exception
看这里,IDEA只给我们提供了try catch异常捕获,但是是无法往外抛出
当然,这是run方法定义的问题,一是void返回值,二是并没有定义异常
那么Runnable为什么要这样设计呢?
首先,runable可以往外抛的话,接收者是线程 ,线程不是程序员编写的,抛出去也没机会处理
所以定义的是我们只能在run方法里面捕获
看下源码
FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
void返回值,且run方法,没有 throws Exception
针对这亮点问题.诞生了
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
这里,就有返回值了,且抛出去了异常
长耗时的操作,交给子线程,执行好之后,通知主线程结果,主线程不需要等,这就是future
callable是有返回值的,通过Future.get获取callable接口返回的执行结果
还可以通过Future.isDone()来判断任务是否已经执行完了,以及取消任务,给任务限时
可以说通过future来操作callable
call未执行完毕之前,可以通过Future.get将主线程阻塞,call执行完毕,主线程拿到结果,主线程再切换到runnable状态
所以,Future是一个存储器,存储call这个任务的结果,两者相互配合。
get方法有5中情况
get方法立即获得结果
get方法将把主线程阻塞,直到任务完成
前两种是普遍的
get方法会抛出ExecutionException执行异常,与call方法抛出的异常类型无关
future是可以把任务取消的,如果取消再get,会抛出CancellationExecption,取消异常
get方法有一个重载方法,可以传入延迟时间,时间到了还没获取到结果,get方法会抛出TimeoutException超时异常
get(long timeout,TimeUnit unit)方法,如果call在规定时间完成了任务,返回正常值,没有的话抛出异常,
那么我们不能简单的抛出异常,调用方不用人家做了,还需要通知取消这个任务
取消任务,也要思考很多
上面简单认识了下方法,这里做总结,如何用,何时用这些方法
get
带时间参数get
cancel
isDone:这里注意仅仅执行完毕,但不能判断成功与否
isCancelled:是否被取消
线程池summbit任务的时候,会返回一个future,但这个future是空的,任务执行完毕,线程池便会把这个结果填入返回的future,而不是创建新的,
public class OneFuture {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Future<Integer> future = service.submit(new CallableTask());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
static class CallableTask implements Callable<Integer>{
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}
}
public class OneFutureLambda {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Callable<Integer> callable = ()->{
Thread.sleep(3000);
return new Random().nextInt();
};
Future<Integer> future = service.submit(callable);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
}
/**
* 描述: 演示批量提交任务时,用List来批量接受结果
*/
public class MutiFutures {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
Callable<Integer> callable = ()->{
Thread.sleep(3000);
return new Random().nextInt();
};
ArrayList<Future> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Future<Integer> future = service.submit(callable);
futures.add(future);
}
for (int i = 0; i < 20; i++) {
Future<Integer> future = futures.get(i);
try {
Integer integer = future.get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
/**
* 演示get方法过程中,抛出异常
* for循环演示Exception,演示只在get方法执行时抛出异常
*/
public class GetException {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
Callable<Integer> callable = ()->{
throw new IllegalArgumentException("callable抛出异常");
};
Future<Integer> future = service.submit(callable);
try {
for (int i = 0; i < 5; i++) {
System.out.println(i);
Thread.sleep(500);
}
System.out.println(future.isDone());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("IllegalArgumentException异常");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("ExecutionException 异常");
}
}
}
通过结果可以看到,并没有马上的抛出,而是等到get的时候,才报错,而且错误类型时Execution异常
/**
* 需要注意超时后处理,要调用future.cancel
* 演示cancel,参数true 和false 代表是否中断正在执行的任务
*/
public class TimeOut {
private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
private static final ExecutorService exec = Executors.newFixedThreadPool(10);
Callable callable = ()->{
try{
Thread.sleep(3000);
}catch (InterruptedException e){
System.out.println("sleep期间被中断了");
return new Ad("被中断时候的默认广告");
}
return new Ad("正常广告");
};
public void printAd(){
Future<Ad> f = exec.submit(callable);
Ad ad;
try {
ad = f.get(2000,TimeUnit.MILLISECONDS) ;
} catch (InterruptedException e) {
ad = new Ad("被中断时候的默认广告");
} catch (ExecutionException e) {
ad = new Ad("异常时候的默认广告");
} catch (TimeoutException e) {
ad = new Ad("超时时候的默认广告");
System.out.println("超时,未获取到广告");
boolean cancel = f.cancel(true);
System.out.println("cancel的结果"+cancel);
}
exec.shutdown();
System.out.println(ad.toString());
}
public static void main(String[] args) {
TimeOut timeOut = new TimeOut();
timeOut.printAd();
}
static class Ad{
String name;
public Ad(String name){
this.name = name;
}
@Override
public String toString() {
return "Ad{" +
"name='" + name + '\'' +
'}';
}
}
}
超时的话,我们会做一个处理,就是去cancel这个任务,但是cancel也是可以选择是否去中断这个任务的,
值得注意的是:cancel之后,就不能去做get的了,会报cancel异常的,注意哈,
刚才用了cancel方法,这里说下注意点:
如果这个任务还没有来得及执行,会正常取消,返回true,没什么问题,但是如果任务已经完成,或者已经取消了
再执行cancel方法是会执行失败的,方法返回false
还有就是注意参数,true的话,会去中断任务
这里分析一个问题,就是传入false的的时候,任务还是正常运行,那这里cancel有什么用呢??
假设这个任务,并没有向上面的代码一样,做好了中断的异常捕获处理,那么这个false就会发挥作用,
而ture则适用于任务能处理interrupt
false适用于,一些还没开始的任务,和不能处理interrupt的任务、不清楚任务是否支持取消的任务
算是一种保守的策略
FutureTask和callable类似是一个任务,是一个包装器,可以把callable转化成Future和Runnable,实现了runnable和Future这两个接口
所以,它几个可以作为Runnable被线程执行,也可以作为Future得到Callable的返回值
这里就巧妙的运用它的特性,首先将Callable实例当作参数生成FutureTask对象,此时,把FutureTask当作Runnable,通过线程或者线程池运行它,再通过FutureTask获取刚才的结果,也就是说,通过FutrueTask,两个变一个,一个Futrue代替Callable和Futrue
/**
* 演示futureTask的用法
*/
public class FutureTaskDemo {
public static void main(String[] args) {
Callable callable = () ->{
System.out.println("子线程正在计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++) {
sum +=i;
}
return sum;
};
FutureTask<Integer> integerFutureTask = new FutureTask<Integer>(callable);
//线程方法
//new Thread(integerFutureTask).start();
ExecutorService service = Executors.newCachedThreadPool();
service.submit(integerFutureTask);
try {
System.out.println("task运行结果"+integerFutureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
1、for循环批量获取future的时候,一部分线程慢,get方法调用需要用timeout限制
多个任务下。上面讲的案例,数组获取结果
那么线程池运行的任务 的先后顺序是不同的,假设5个任务,1号任务没完成,2345完成了,那么1号get的时候阻塞,后面的任务都是没法运行的,效率室友损耗的
两种方案,一是丢弃掉,get的时候,传入Timeout,太长了就丢弃它
还有就是利用compufbalefuture,某个子任务,先完成,就会运行先做好的任务
2、Future的生命周期无法后退,一旦完成,那么久就无法重新再来,和我们的人生一样~
上面讲了future和callable,提到了,可以用它来解决for循环时,线程池运行速度不一,出现阻塞等待的问题
上产中,常常用线程池和callable、future做异步任务,然而想要获取异步结果,不能像上面的例子一样直接报错啊,捕获,需要判断isDone来获取结果,通过轮询的方法,耗费cpu资源
public static void testFuture1() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(10);
//定义一个异步任务
Future<String> future = executorService.submit(()->{
Thread.sleep(3000);
return "我要去会所按摩";
});
//轮询获取结果,耗费的CPU资源
while (true){
if(future.isDone()) {
System.out.println(future.get());
break;
}
}
}
或者不做判断,直接get,则会阻塞线程,当然两种方式都是不友好的,占用资源
jdk8之后,引入的CompeltableFuture,帮助我们简化异步编程的复杂性,函数式编程也会让代码更加的简洁
可以在任务完成后做对应的callback回调处理
函数式编程不熟悉的,可以到我大数据专栏里看,那里讲解了
业务开发中的集合信息处理。业务需要从不同地方获取数据,再汇总处理,再返回调用方。做聚合信息处理
解决串行请求响应时间长的问题,通过CompletableFuture可以大大提示性能
多任务编排调度,也可用使用Completable完成
实现了Future和CompletionStage接口,相当于一个Task编排工具
CompletionStage时一个jdk8的新增接口,用于异步执行中的阶段处理,CompletionStage是一个实类
对任务处理构造一个传递链,包括异常处理、类型转换,传递过程中任何一个CompletionStage都可以对结果处理,所以就用到了函数式编程,将任务一步一步的去处理,编排
异步往往和线程池配合
在springboot中,@Async,通过的也是线程池
当前的Task再那个线程执行,由方法命名控制
xxxAsync表示异步,在线程池中执行,
没用指定线程池的情况下,使用CompletableFuture内部线程池ForkJoinPool,线程数默认是cpu核数
需要注意线程池在业务层面划分,避免慢因为慢的io操作影响整体性能
completableFuture是对Futrue异步特性的加强,实现了ComplationStage,结合线程池,构成了一个异步编程的工具,且可在各个阶段进行处理,利用到了函数式编程,解决了Future的复杂性,和一些不足,比如get方法阻塞问题。可以利用它做聚合信息处理,提高接口响应!
runAsync applyAsync
get() getNow
thenAccept thenApply thenRun
主要就是这几个方法,工作会用到
public class FutureTest {
public static void main(String[] args) throws Exception{
// testFuture2();
// testFuture3();
testFuture4();
}
/**
* 任务编排有返回值
* @throws Exception
*/
public static void testFuture4() throws Exception{
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "执行任务1");
return "先学会走路";
});
CompletableFuture<Void> future2 = future.thenAccept((ele) -> {
System.out.println("入参:"+ele);
System.out.println(Thread.currentThread().getName()+"执行任务2");
});
System.out.println("future2:"+future2.get());
}
/**
* 任务编排有返回值
* @throws Exception
*/
public static void testFuture3() throws Exception{
。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "执行任务1");
return "先学会走路";
});
CompletableFuture<String> future2 = future.thenApply((ele) -> {
System.out.println("入参:"+ele);
System.out.println(Thread.currentThread().getName()+"执行任务2");
return ele+"再学会跑步";
});
System.out.println("future2:"+future2.get());
}
/**
* 简单使用
* @throws Exception
*/
public static void testFuture2() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行");
return "我是你爸爸";
});
System.out.println(Thread.currentThread().getName()+":"+future.getNow("默认值"));
}
}
下面讲开发中会遇到的复杂场景,这里举的是,详情页,与浏览页的请求
要知道,浏览页基本信息和详情页,为了提高响应速度,通常分为主表和附属表,通过不同的接口去获取
比如我们点进去时候的详情,才会去查子表中的数据,做垂直分表。
这个案例,点击进去的时候,程序一般设计为并行的去请求这两个接口,但是,为了方便,都用这个案例来做哈
这个方法,为了组合多个completeFuture
刚才写的demo,里面,thenApply使用的时候,接受的是一个String的值,然而,在开发中,往往返回CompletableFuture类型,方便后续的操作,然而,这样就会出现嵌套问题。取返回的结果的时候,需要get两次
比如,thenApply的时候,接受的是一个procuct,我们return出去一个CompletableFuture<Product>
最后接受的时候,就是一个CompletableFuture<CompletableFuture<Product>>
用thenCompose组合起来,就不会有这个问题,
public class Product {
private int id;
private String title;
private String detail;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getDetail() {
return detail;
}
public void setDetail(String detail) {
this.detail = detail;
}
@Override
public String toString() {
return "Product{" +
"id=" + id +
", title='" + title + '\'' +
", detail='" + detail + '\'' +
'}';
}
}
class ProductDetailService{
private static final Map<Integer,String> map = new HashMap<>();
static {
map.put(1,"牙刷-详情图内容");
map.put(2,"内裤-详情图内容");
map.put(3,"手套-详情图内容");
map.put(4,"被子-详情图内容");
map.put(5,"杯子-详情图内容");
map.put(6,"毛巾-详情图内容");
map.put(7,"媳妇-详情图内容");
}
public String getById(int id){
try {
Thread.sleep(1000);
System.out.println("DetailService # getById方法运行线程:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return map.get(id);
}
}
class ProductService{
private static final Map<Integer,String> map = new HashMap<>();
static {
map.put(1,"牙刷");
map.put(2,"内裤");
map.put(3,"手套");
map.put(4,"被子");
map.put(5,"杯子");
map.put(6,"毛巾");
map.put(7,"媳妇");
}
public String getById(int id){
try {
Thread.sleep(1000);
System.out.println("ProductService#getById方法运行线程:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return map.get(id);
}
}
class Test{
public static void main(String[] args) throws Exception {
// testEmbedFuture();
testThenCompose();
}
/**
* 多个completableFuture,不用thenCompnse
* @throws Exception
*/
public static void testEmbedFuture() throws Exception{
ProductService productService = new ProductService();
ProductDetailService detailService = new ProductDetailService();
int id = 1;
CompletableFuture<CompletableFuture<Product>> future = CompletableFuture.supplyAsync(() -> {
String title = productService.getById(id);
Product product = new Product();
product.setTitle(title);
product.setId(id);
return product;
}).thenApply(new Function<Product, CompletableFuture<Product>>() {
@Override
public CompletableFuture<Product> apply(Product product) {
return CompletableFuture.supplyAsync(() -> {
//用到上一步的结果
String detail = detailService.getById(id);
product.setDetail(detail);
return product;
});
}
});
System.out.println(future.get().get());
}
/**
* 用thenCompose
* @throws Exception
*/
public static void testThenCompose() throws Exception {
ProductService productService = new ProductService();
ProductDetailService detailService = new ProductDetailService();
int id = 1;
CompletableFuture<Product> thenCompose = CompletableFuture.supplyAsync(() -> {
String title = productService.getById(id);
Product product = new Product();
product.setId(id);
product.setTitle(title);
return product;
}).thenCompose(new Function<Product, CompletionStage<Product>>() {
@Override
public CompletionStage<Product> apply(Product product) {
return CompletableFuture.supplyAsync(() -> {
String detail = detailService.getById(product.getId());
product.setDetail(detail);
return product;
});
}
});
System.out.println(thenCompose.get());
}
}
和上面类似,都是处理两个future,不过这个是部分先后顺序。
这个,就很常用了,上面都是两个任务的,通过allOf,就可以实现多个任务,,通过join,全执行完了再返回
public static void testAllOf()throws Exception{
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future3";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
System.out.println("begin"+ LocalDateTime.now());
//阻塞等待, 全部任务完成
allOf.join();
if(allOf.isDone()){
System.out.println("全部任务完成");
}
System.out.println("end-"+LocalDateTime.now());
}
和allOf的区别就是,anyOf是一个任务完成,就可以使用了,用的场景不如allOf多,
比如,几个接口一样,为了获得更快的数据,就可以采用anyOf
public static void testAllOf()throws Exception{
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("future1完成");
return "future3";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
System.out.println("begin"+ LocalDateTime.now());
//阻塞等待, 全部任务完成
allOf.join();
if(allOf.isDone()){
System.out.println("全部任务完成");
}
System.out.println("end-"+LocalDateTime.now());
}
模拟工作中,请求合并接口,通过complatableFuture减少连接数,处理更多链接
也类似爬虫业务多个URL并行爬取和解析
商品详情页组装,如SKU。主图、评价等,
public class EduService {
public String getRank() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "rank info";
}
public String getCategory() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Category info";
}
public String getBanner() {
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Banner info";
}
public String getVideoCard() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Video Card info";
}
}
//测试
class TestFunction{
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16,
32,100, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
public static Map<String,Object> homePageAggApi()throws Exception{
Map<String,Object> homePageInfo = new HashMap<>();
//模拟不同的服务调用
EduService eduService = new EduService();
CompletableFuture<Void> bannerFuture = CompletableFuture.runAsync(() -> {
String banner = eduService.getBanner();
homePageInfo.put("banner", banner);
}, executor);
CompletableFuture<Void> categoryFuture = CompletableFuture.runAsync(() -> {
String category = eduService.getCategory();
homePageInfo.put("category",category);
}, executor);
CompletableFuture<Void> rankFuture = CompletableFuture.runAsync(() -> {
String rank = eduService.getRank();
homePageInfo.put("rank",rank);
}, executor);
CompletableFuture<Void> videoCardFuture = CompletableFuture.runAsync(() -> {
String videoCard = eduService.getVideoCard();
homePageInfo.put("videoCard",videoCard);
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(bannerFuture, categoryFuture, rankFuture, videoCardFuture);
//join()和get()方法都是阻塞调用它们的线程(通常为主线程)用来获取CompletableFuture异步之后的返回值
allOf.get();
return homePageInfo;
}
public static void main(String[] args) throws Exception {
System.out.println("begin:"+ LocalDateTime.now());
Map<String, Object> homePage = homePageAggApi();
System.out.println(homePage.toString());
System.out.println("end:"+LocalDateTime.now());
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。