Resilience4j 是受 Netflix Hystrix 启发的轻量级容错库,但专为 Java 8 和函数式编程而设计。轻巧,因为该库仅使用 Vavr,而 Vavr 没有任何其他外部库依赖项。相比之下,Netflix Hystrix 对 Archaius 具有编译依赖性,而 Archaius 具有更多的外部库依赖性,例如 Guava 和 Apache Commons Configuration。另外,Netflix Hystrix 目前处于维护状态,不在主动开发,SpringCloud 在 2020 版本后,已经移除了 spring-cloud-netflix 相关模块,容错这块也推荐使用 Resilience4j。
Resilience4j 提供了通过装饰器的方式,以使用断路器,速率限制器,重试或隔板来增强任何功能接口,lambda 表达式或方法引用。您可以在任何功能接口,lambda 表达式或方法引用上堆叠多个装饰器来做熔断、限流等动作。
容错是指系统在部分组件(一个或多个)发生故障时仍能正常运作的能力。要具有这个能力,通常要包含断路器(CircuitBreaker)、并发调用隔离(Bulkhead)、限流(RateLimiter)、重试(Retry)、超时(Timeout)机制。
断路器一般通过 3 个有限状态机来实现,CLOSED、OPEN、HALF_OPEN。此外,还有 2 个特殊的状态机,DISABLED 和 FORCED_OPEN。状态的存储更新必须是线程安全的,即只有一个线程能够在某个时间点更新状态。
滑动窗口:
断路器使用滑动窗口来存储和汇总调用结果,有两种选择。基于计数的滑动窗口 Count-based 和基于时间的滑动窗口 Time-based。
基于计数的滑动窗口:汇总最近 N 次调用的结果。
基于时间的滑动窗口:汇总最近 N 秒的调用结果。
相关配置:请查看附录 CircuitBreaker 配置。
在系统设计中,需要预期故障的发生,将应用程序拆分成多个组件,通过资源隔离确保一个组件的故障不会影响其他的组件。例如:
生活:就像轮船用隔板(Bulkhead)分成多个小隔间,每个隔间都被隔板密封,这样可以防止洪水时整艘船沉没。
系统:两个服务 A 和服务 B,A 的某些 API 依赖 B,当服务 B 运行速度非常慢的时候,A 调用 B 的请求变多时,A 的性能会受到影响,服务 A 中那些不依赖于服务 B 的功能也无法处理。因此,需要隔离资源专门处理服务 A 依赖服务 B 的调用请求。
并发调用的隔离一般有两种方式来实现:信号量 Semaphore 和线程池 ThreadPool。Resilience4j 提供了 SemaphoreBulkhead 和 FixedThreadPoolBulkhead 来实现 Bulkhead。
相关配置:请查看附录 Bulkhead 配置。
流量控制是确保服务的高可用性和可靠性的重要技术。流控的场景,服务 A 依赖服务 B,服务 A 有 3 个实例,服务 B 会为了接收到请求做大量的 CPU / IO 密集工作,因此服务 B 在给定的时间范围内设置可以处理的最大请求数的限制。
设置流控后
流控和断路器的区别
流控:速率限制器通过控制吞吐量来帮助保护服务器免于过载。
断路器:当目标服务器出现故障/无响应时,Circuit Breaker 有助于保持客户端的安全和正常运行。
相关配置:请参考附录 RateLimiter 配置
微服务体系中,多个服务互相依赖,当被依赖的服务出现问题而无法按预期响应时,就会级联到下游服务,导致不良的用户体验。
同样,在微服务体系中,一个服务会有多个实例,如果其中一个实例可能有问题,并且无法正确响应我们的请求,则如果我们重试该请求,则负载均衡器可以将请求发送到运行状况良好的节点并正确获得响应。通过重试,有更多机会获得正确的响应。
相关配置:请参考附录 Retry 配置
在微服务体系中,服务间相互依赖,例如:A—>B—>C—>D,可能由于某些网络原因,导致被依赖服务 D 无法按预期响应,这种缓慢会导致下游服务一直到服务 A,并且阻塞单个服务中的线程。由于这不是很常见的问题,在设计时需要设置超时来应对服务缓慢/不可用性问题。
相关配置:请参考附录 Timeout 配置
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.6.1</version>
</dependency>
配置-服务调用方
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.6.1</version>
</dependency>
代码-服务提供方,模拟异常
@GetMapping("/rating_random_fail/{productId}")
public ResponseEntity<ProductRatingDTO> getRatingRandomFail(@PathVariable Integer productId) {
ProductRatingDTO productRatingDTO = ratingService.getRatingForProduct(productId);
return failRandomly(productRatingDTO);
}
/**
* 模拟服务随机失败
*
* @param productRatingDTO
* @return
*/
private ResponseEntity<ProductRatingDTO> failRandomly(ProductRatingDTO productRatingDTO){
int random = ThreadLocalRandom.current().nextInt(1, 4);
log.info("[服务端模拟重试场景,数字] -> {}", random);
if(random < 2){
return ResponseEntity.status(500).build();
}else if(random < 3){
return ResponseEntity.badRequest().build();
}
return ResponseEntity.ok(productRatingDTO);
}
服务调用方-重试
private static int retryCount; // 记录重试次数,进行验证
/**
* 服务端模拟随机失败,客户端实现重试
*
* @param productId
* @return
*/
@Retry(name = "ratingRetryService", fallbackMethod = "getDefaultProductRating")
public CompletionStage<ProductRatingDTO> getProductRatingDto(int productId){
retryCount++;
log.info("[重试模拟 {}],开始调用 {}", retryCount, Instant.now());
Supplier<ProductRatingDTO> supplier = () ->
this.restTemplate.getForEntity(this.ratingEndpoint + productId, ProductRatingDTO.class).getBody();
return CompletableFuture.supplyAsync(supplier);
}
/**
* 客户端失败回调方法
*
* @param productId
* @param throwable
* @return
*/
private CompletionStage<ProductRatingDTO> getDefaultProductRating(int productId, HttpClientErrorException throwable) {
retryCount = 0;
log.info("[重试模拟 {} ],进入回调方法.", retryCount);
return CompletableFuture.supplyAsync(() -> ProductRatingDTO.of(0, Collections.emptyList()));
}
resilience4j:
bulkhead:
instances:
ratingBulkheadService:
max-concurrent-calls: 5 ## 隔板最大的信号量
max-wait-duration: 10ms
/**
* 服务提供者 — 模拟服务端处理缓慢
*
* @param productId
* @return
* @throws InterruptedException
*/
@GetMapping("/rating_slow_response/{productId}")
public ResponseEntity<ProductRatingDTO> getRatingSlowResponse(@PathVariable Integer productId) throws InterruptedException {
TimeUnit.SECONDS.sleep(10L);
return ResponseEntity.ok(ratingService.getRatingForProduct(productId));
}
服务调用方
/**
* 服务端模拟响应缓慢,客户端设置并发隔板
*
* @param productId
* @return
*/
@Bulkhead(name = "ratingBulkheadService", type = Type.SEMAPHORE, fallbackMethod = "getDefault")
public ProductRatingDTO getProductRatingDtoBulkhead(int productId) {
log.info("[重试并发隔板 {}],调用开始。", Instant.now());
ProductRatingDTO productRatingDTO = this.restTemplate
.getForEntity(this.productEndpoint + "/rating_slow_response/" + productId, ProductRatingDTO.class)
.getBody();
log.info("[重试并发隔板 {}],调用结束。", Instant.now());
return productRatingDTO;
}
/**
* 客户端失败回调方法
*
* @param productId
* @param throwable
* @return
*/
private ProductRatingDTO getDefault(int productId, Throwable throwable) {
log.info("==> 进入回调方法.");
return ProductRatingDTO.of(0, Collections.emptyList());
}
resilience4j:
ratelimiter:
instances:
productRateLimiter:
limitForPeriod: 3 ## 每10秒内可用3个
limitRefreshPeriod: 10s
timeoutDuration: 0
/**
* 根据 productId 获取商品 — 模拟流量控制
*
* @param productId
* @return
*/
@RateLimiter(name = "productRateLimiter", fallbackMethod = "getProductByIdFallback")
public BaseResponse<ProductDTO> getProductByIdRateLimiter(int productId) {
ProductPO po = this.map.get(productId);
ProductDTO productDTO = ProductDTO.of(po.getProductId(), po.getDescription(), po.getPrice(), null);
return BaseResponse.of(productDTO, ResponseType.SUCCESS, Strings.EMPTY);
}
private BaseResponse<ProductDTO> getProductByIdFallback(int productId, Throwable throwable) {
return BaseResponse.of(null, ResponseType.FAILURE, "当前用户较多,请稍后再试。");
}
resilience4j:
timelimiter:
instances:
ratingTimeoutService:
timeout-duration: 3s ## 3秒超时
cancel-running-future: true ## 超时后取消正在执行的线程任务
/**
* 服务提供者 — 重试服务端网络抖动
*
* @param productId
* @return
* @throws InterruptedException
*/
@GetMapping("/rating_timeout/{productId}")
public ResponseEntity<ProductRatingDTO> getRatingTimeout(@PathVariable Integer productId) throws InterruptedException {
int second = ThreadLocalRandom.current().nextInt(1, 5);
log.info("[服务端模拟超时场景,超时 {} 秒]", second);
TimeUnit.SECONDS.sleep(second);
return ResponseEntity.ok(ratingService.getRatingForProduct(productId));
}
服务调用方
/**
* 服务端模拟随机失败,客户端实现超时机制
*
* @param productId
* @return
*/
@TimeLimiter(name = "ratingTimeoutService", fallbackMethod = "getDefaultTimeout")
public CompletionStage<ProductRatingDTO> getProductRatingDtoTimeout(int productId) {
log.info("[超时模拟],开始调用 {}", Instant.now());
Supplier<ProductRatingDTO> supplier = () -> this.restTemplate
.getForEntity(this.productEndpoint + "/rating_timeout/" + productId, ProductRatingDTO.class)
.getBody();
return CompletableFuture.supplyAsync(supplier);
}
/**
* 客户端超时回调方法
*
* @param productId
* @param throwable
* @return
*/
private CompletionStage<ProductRatingDTO> getDefaultTimeout(int productId, Throwable throwable){
log.info("[超时模拟 {} ],进入回调方法.");
return CompletableFuture.supplyAsync(() -> ProductRatingDTO.of(0, Collections.emptyList()));
}
resilience4j:
circuitbreaker:
configs:
default:
sliding-window-type: count-based
sliding-window-size: 100
permitted-number-of-calls-in-half-open-state: 10 ## 在半开状态时,允许调用的数量
wait-duration-in-open-state: 10ms ## 从打开状态转变为半开状态等待的时间
failure-rate-threshold: 60 ## 失败率阀值,百分比
record-exceptions:
- org.springframework.web.client.HttpServerErrorException
instances:
ratingCircuitBreakService:
base-config: default
retry:
instances:
ratingCircuitBreakService:
max-attempts: 2 ## 最多重试3次
wait-duration: 1s ## 每次重试调用前,等待2秒
retry-exceptions:
- org.springframework.web.client.HttpServerErrorException
ignore-exceptions:
- org.springframework.web.client.HttpClientErrorException
/**
* 服务提供者 — 模拟熔断场景
*
* @param productId
* @return
* @throws InterruptedException
*/
@GetMapping("/rating_circuit_break/{productId}")
public ResponseEntity<ProductRatingDTO> getRatingCircuitBreakResponse(@PathVariable Integer productId) throws InterruptedException {
ProductRatingDTO productRatingDTO = ratingService.getRatingForProduct(productId);
return circuitBreakFailRandomly(productRatingDTO);
}
/**
* 模拟熔断场景
*
* @param productRatingDto
* @return
* @throws InterruptedException
*/
private ResponseEntity<ProductRatingDTO> circuitBreakFailRandomly(ProductRatingDTO productRatingDto) throws InterruptedException {
// 模拟响应延迟
TimeUnit.MILLISECONDS.sleep(100L);
// 模拟响应失败
int random = ThreadLocalRandom.current().nextInt(1, 4);
if(random < 3) {
return ResponseEntity.status(500).build();
}
return ResponseEntity.ok(productRatingDto);
}
服务调用方
/**
* 服务端模拟响应延迟、响应失败,客户端设置熔断机制
*
* @param productId
* @return
*/
@Retry(name = "ratingCircuitBreakService", fallbackMethod = "getDefault")
@CircuitBreaker(name = "ratingCircuitBreakService", fallbackMethod = "getDefault")
public ProductRatingDTO getProductRatingDtoCircuitBreak(int productId) {
log.info("[熔断 {}],调用开始。", Instant.now());
ProductRatingDTO productRatingDTO = this.restTemplate
.getForEntity(this.productEndpoint + "/rating_circuit_break/" + productId, ProductRatingDTO.class)
.getBody();
log.info("[熔断 {}],调用结束。", Instant.now());
return productRatingDTO;
}
/**
* 客户端失败回调方法
*
* @param productId
* @param throwable
* @return
*/
private ProductRatingDTO getDefault(int productId, Throwable throwable) {
log.info("==> 进入回调方法.");
return ProductRatingDTO.of(0, Collections.emptyList());
}
参考资料:
本文转载自:金科优源汇(ID:jkyyh2020)
原文链接:Resilience4j 实用指南
领取专属 10元无门槛券
私享最新 技术干货