前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你应该使用Java8 非阻塞异步API来优化你的系统了

你应该使用Java8 非阻塞异步API来优化你的系统了

作者头像
程序猿DD
发布2020-12-18 16:34:10
8270
发布2020-12-18 16:34:10
举报
文章被收录于专栏:程序猿DD

非同步和非阻塞

什么是非同步?
  • 异步执行
  • 不是同步的方式运行,或者不是按照你描述的顺序发生。
什么是非阻塞
  • 不是阻塞的
  • 不会造成线程的阻塞
为什么需要异步呢?
  • 业务方法太耗时间
  • 网络开销
  • 加解密操作
  • 文件上传下载
  • ......
同步方式有什么坏处?
  • Web 服务,因为执行某些过长的线程长时间占用线程,则你的服务吞吐量严重降低。
  • 桌面或者手机的应用,执行可能会卡顿,等待服务的请求耗时。
传统的阻塞业务处理示例
代码语言:javascript
复制
// 10 seconds
Image img1 = download();
render(img1);
//  12 seconds
Image img2 = download();
render(img2);

这些业务方代码很耗费时间,并且传统的写法,每个方法操作控制起来非常不方便。

Java 8 之前的做法

  • java.lang.Thread
  • JDK1.0
对于上述的示例代码基于JDK8 Consumer 的实现
代码语言:javascript
复制
void downloadAsync(String url,Consumer<Image> c) {
  new Thread(() -> {
    Image result = download(url);
    c.accept(result);
  } ).start();
}
这样实现好么?使用Thread 的这种方式存在什么缺点?
  • 使用Thread 的方式经常需要配合 synchronized,wait,notify 和 join
  • 不同Thread 之间如何存取同一份数据?
  • 如何管控?
  • 如何进行业务方法之间的组合和依赖?
如何对方法之间进行依赖处理示例:
代码语言:javascript
复制
fetchDataAsync (data -> {
  downloadAsync(data , image -> render(image));
});

上述代码可以实现我们想要的结果,但是不推荐,Thread 并没有进行相关的方法组合、依赖API,这种实现方式,到后边基本就成了回调地狱。

避免回调,但是线程之间的结果还是要前后依赖,我们也可以这样实现:
代码语言:javascript
复制
new Thread(() -> {
  final Data result = fetchData();
  Image img = download(result.imageURL);
  Bitmap bitmap = decode(img);
}).start();

上述方式,其实就是把三个线程的返回结果包裹在一个大的Thread 中,这种方式的确可以做,但是还是不够优雅。这样子导致外层的这个Thread 非常大。综上,两种实现方式总结如下:

  • 组合各种非同步方法,写起来还是变成了回调地狱
  • 包一个外层的Thread 执行,如果忘记外层包裹怎么办?如何控制线程资源?
各个线程更复杂的组合怎么办?
  • 如果想要两个线程的任务结果都执行完毕 可以使用Thread#join 来实现
  • 如果只要任意一个结果有返回就可以继续往下运行怎么做?可以使用Thread#join(long mills) 和检查结果值,或者浪费一个Thread 一直去做值的检查工作。代码实现如下:
代码语言:javascript
复制
while(true) {
t1.join(1000);
if (value != null) {
  retrn t1Value;
}
t2.join(1000);
...
}

所以综合分析,直接使用 Thread 根本不靠谱。

新的魔法 - Java1.5+ Future

  • java.util.concurrent.Future
  • java se 5.0
  • 可以将 Future 看做一个一个等待结果的容器,让我们可以尝试去获得结果 示例如下:
代码语言:javascript
复制
ExecutorService service  = Executors.newCacheThreadPool();
Future<Image> f = service.submit(() -> downloadImage(xxx));
// 做些其他事情
// f.get() 得到结果

Future 异常处理

代码语言:javascript
复制
try {
  renderImage(future.get());
} catch (Exception e) {
  e.printCause(); // 打印执行时的错误
}

Future 其他方便的方法

代码语言:javascript
复制
// 取消某个工作
future.cancel(boolean);
// 判断是否取消
future.isCancelled();
// 工作是否完成
future.isDone();

但是 Future 还是有问题,特点如下:

  • 传统 callback 的方式,变成外部可以自行再做处理
  • 简单易懂
  • 只有5个方法
  • 阻塞式 API 来取得回传
  • 不易组合再利用

1.8 终极大法 j.u.c.CompletableFuture

  • java.util.concurrent
  • Java SE 8
  • implements Future, CompletionStage 示例:
代码语言:javascript
复制
CF<Stirng> cf = CompletableFuture.completableFuture("Value");
String result = cf.get();
 // 阻塞等待结果
String result = cf.join();

// 非阻塞等待结果输出
cf.thenAccept(s -> System.out.println(s)); 

String load() {...}
// 非阻塞等待结果
CF<Stirng> cf = CompletableFuture.supplyAsync(() -> load());
// 非阻塞等待结果,并且指定使用某个线程池执行
CF<Stirng> cf = CompletableFuture.supplyAsync(() -> load() , executorService);

CF<Stirng> cf = ...;
CF<Integer> length = cf.thenApply(data -> data.length());
cf1 = cf.thenApplyAsync(...);
cf2 = cf.thenApplyAsync(...);

CF<Stirng> cf = new CompletableFuture();
cf.thenAccept(s -> System.out.println(s););

CF<Stirng> cf = new CompletableFuture();
executor.submit(() -> {
String result = load();
cf.complete(result);
});

// 异常处理
executor.submit(() -> {
try {
String result = load();
cf.complete(result);
} catch (Exception e) {
  cf.completeExceptionally(e);
}
});

cf.whenComplete(
(String s, Throable t) -> {
  if (s != null) { System.out.println(s); } 
  else  System.err.println(t); 
}
);

比如目前存在这样的业务:先查找用户,查找到用户之后,再下载用户的头像图片,此时代码可以写成如下的方式:

代码语言:javascript
复制
CF<?> cf = findUser(1L).thenApply(user -> download(user));

CF<File> cf = findUser(1L).thenCompose(user -> download(user));
更多的操作串下去呢?
代码语言:javascript
复制
CF<File> cf  = findUser(1L).thenCompose(user -> download(user))
.thenCompose(img -> save(img));
如果串起来的异步方法出现异常如何处理?
代码语言:javascript
复制
 findUser(1L).thenApply(...)
.thenApply(...)   // exception 处理
.thenCompose(...)
.whenComplete(...);
CompletableFuture#allOf
代码语言:javascript
复制
CF<String> api1 = ...;
CF<String> api2 = ...;
CF<String> api3 = ...;
CF<Void> all = CompletableFuture.allOf(api1,api2,api3);

// 等待3个操作所有返回值
CF<List<Strnig>> result = all.thenApply(
v -> Arrays.asList(api1.get(), api2.get(), api3.get());
);
CompletableFuture#anyOf
代码语言:javascript
复制
// 等待其中任意一个结果返回
CF<Object> all =  CompletableFuture.anyOf(api1,api2,api3);

API 中常见的行为

代码语言:javascript
复制
CF<User> findUser(String id );
CF<User> saveUser(String id );
CF<User> downloadAvatar(String id);

findUser(...)
.thenCompoe(user -> saveUser(user.id))
.thenCompose(user -> downloadAvatar(user.id))
.thenAccept(img -> render(img));
CompletableFuture 多 key 查询
代码语言:javascript
复制
CF<Value> executeQuery(String id);
List<CF<Value>> queries = ids.stream()
.map(id -> executeQuery(id))
.collect(toList());
// using allOf to let
// List<CF<Value>> -> CF<List<Value>>
CF<Void> all = CF.allOf(queies.toArray());

CF<List<Value>> result  = all.thenApply(v -> queies.stream().map(q -> q.join())
.collect(toList)
);

CompletableFuture 像极了 Data Flow

代码语言:javascript
复制
getOrderFromNetwork ---> listProducts --> sendMail 

CompletableFuture 优点

  • 事件驱动
  • 容易组合
  • 控制权可以交给呼叫者
  • 减少Thread 的浪费

CompletableFuture 缺点

  • Java8 中 Future/Promise 的混合,不少语言是分开的
  • 爆多的方法数量
  • 60+ 方法

注意

  • CompletableFuture#cancel 方法不能取消正在执行的工作
  • 尽量使用 Async 结尾的API

支持非同步的 WEB 框架

  • Servlet 3.x+ AsyncContext
  • SpringFramework Controller 的回传值直接用 CompletableFuture
  • Play Framework
  • Asynchronous web framework
  • play.libs.F.Promise

思考一下 Web application

  • 该不该用处理 http 的 thread 做事?
  • Tomcat 有 max-threads 设定
  • Play 本来就是 http 跟 worker 分离
  • 每个要求的工作时间不一定相同
  • 花多少时间?占多少比例?
  • 花时间的工作有没有资源存取上限?

同步/异步请求简单测试

  • Job:1500ms ~ 30%, 100 ms ~ 70%
  • Tomcat max-threads 200
  • ab -n 1000 -c 400
  • Async ~ 375 requests/second
  • Sync ~ 300 requests/second 如果方法处理速度很快,则传统写法会比异步方式更好。因为异步操作需要更多的操作和等待。

Reactive 编程

  • Data Flow
  • Java9 Flow API 支持

推荐关注本文作者

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿DD 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 非同步和非阻塞
    • 什么是非同步?
      • 什么是非阻塞
        • 为什么需要异步呢?
          • 同步方式有什么坏处?
            • 传统的阻塞业务处理示例
            • Java 8 之前的做法
              • 对于上述的示例代码基于JDK8 Consumer 的实现
                • 这样实现好么?使用Thread 的这种方式存在什么缺点?
                  • 如何对方法之间进行依赖处理示例:
                • 避免回调,但是线程之间的结果还是要前后依赖,我们也可以这样实现:
                  • 各个线程更复杂的组合怎么办?
                  • 新的魔法 - Java1.5+ Future
                  • 1.8 终极大法 j.u.c.CompletableFuture
                    • 更多的操作串下去呢?
                      • 如果串起来的异步方法出现异常如何处理?
                        • CompletableFuture#allOf
                          • CompletableFuture#anyOf
                          • API 中常见的行为
                            • CompletableFuture 多 key 查询
                            • CompletableFuture 像极了 Data Flow
                            • CompletableFuture 优点
                            • CompletableFuture 缺点
                            • 注意
                            • 支持非同步的 WEB 框架
                            • 思考一下 Web application
                            • 同步/异步请求简单测试
                            • Reactive 编程
                            相关产品与服务
                            容器服务
                            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档