Running with Spring Boot v2.4.5, Spring Retry v1.3.1
在与外部系统交互时,由网络抖动亦或是外部系统自身的短暂性问题触发的瞬时性故障是一个绕不过的坑,而重试可能是一个比较有效的避坑方案;但有一点需要特别注意:外部系统的接口是否满足幂等性,比如:尽管调用外部系统的下单接口超时了,但外部系统订单数据可能已经落库了,这个时候再重试一次,外部系统内的订单数据可能就重复了!
Spring Retry
为Spring应用提供了重试功能,同时支持声明式重试
(Declarative Retry)和编程式重试
(Programmatic Retry)两种风格;此外,其不仅对业务代码无侵入性,而且还支持重试配置;但Spring Retry的重试决策机制大多是基于Throwable
的,尚不支持基于返回结果来进行重试决策。
RetryContext
记录了已重试次数、上一次重试所触发的异常以及context.state
、context.name
、context.exhausted
、context.closed
、context.recovered
等属性。
public interface RetryContext extends AttributeAccessor {
String NAME = "context.name";
String STATE_KEY = "context.state";
String CLOSED = "context.closed";
String RECOVERED = "context.recovered";
String EXHAUSTED = "context.exhausted";
boolean isExhaustedOnly();
RetryContext getParent();
int getRetryCount();
Throwable getLastThrowable();
}
RetryOperations
接口定义了四个execute()
方法,无论是声明式重试还是编程式重试,都是通过execute()来进行重试操作的!
public interface RetryOperations {
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E, ExhaustedRetryException;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E;
}
Classifier
接口只有一个classify()
方法,主要用于将给定类型对象C
分类为另一类型的对象T
。
public interface Classifier<C, T> extends Serializable {
T classify(C classifiable);
}
SubclassClassifier<C, T>
内部维护了一个ConcurrentHashMap<Class<? extends C>, T>
,用于存储待分类对象C
与分类后对象T
的映射关系。其分类逻辑如下:
BinaryExceptionClassifier<Throwable, Boolean>
继承自SubclassClassifier,主要用于将异常分类为两种类型:true
或false
,这也正符合其二元异常分类器的定位。BinaryExceptionClassifier不仅沿用了SubclassClassifier的分类逻辑,同时还进一步拓展:首先通过getCause()
方法一层一层地获取待分类异常的原因,然后以这些Throwable类型的原因作为key,来寻找映射关系。
RetryPolicy
接口定义了四个方法,canRetry()
方法用来判决是否可以重试;open()
方法用来分配RetryContext;registerThrowable()
方法用于在每次重试失败后注册异常。
public interface RetryPolicy extends Serializable {
boolean canRetry(RetryContext context);
RetryContext open(RetryContext parent);
void close(RetryContext context);
void registerThrowable(RetryContext context, Throwable throwable);
}
为了应对瞬时性故障,重试操作往往应该间隔一段时间后执行,BackOffPolicy
接口即是对重试间隔的抽象;backOff()
方法是BackOffPolicy接口的核心,在Spring Retry 1.1 前其底层依托于Object#wait()
方法来模拟重试间隔,在在Spring Retry 1.1 后依托于Thread#sleep()
方法来模拟重试间隔。
public interface BackOffPolicy {
BackOffContext start(RetryContext context);
void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;
}
minBackOffPeriod + random.nextInt(maxBackOffPeriod - minBackOffPeriod)
interval * multiplier
(interval * multiplier) * (1 + r.nextFloat() * (multiplier - 1))
为了进一步增强业务方法的健壮性,我们可以通过实现RecoveryCallback
回调接口来封装一个兜底逻辑;这样在重试已耗尽且业务方法依然执行失败的时候,就会执行该兜底逻辑。
public interface RecoveryCallback<T> {
T recover(RetryContext context) throws Exception;
}
个人感觉重试监听器是一个鸡肋的功能,在实际工作中,基本用不到重试监听器,我们重点关注RetryListener
接口中open()
、close()
、onError()
这三个方法的调用时机即可。
public interface RetryListener {
// 在整个重试之前执行
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
// 在整个重试之后执行
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
// 在每次重试失败后执行
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
}
在Spring Retry中,重试可以分为:无状态重试
(Stateless Retry)与有状态重试
(Stateful Retry)。在无状态重试中,每次进入RetryTemplate#execute()
方法内的while循环
前,一定会生成一个全新的RetryContext
实例。但在有状态重试中,第一次进入RetryTemplate#execute()
方法内的while循环
前,首先会生成一个全新的RetryContext
实例,然后根据目标方法生成一个key,最后将key与RetryContext实例组成键值对保存在RetryContextCache
中;那么在下次进入RetryTemplate#execute()
方法内的while循环
前,就不再生成RetryContext实例了,而是使用RetryContextCache中的RetryContext实例。换句话说,无状态重试中的RetryContext实例是保存在栈内,而有状态重试中的RetryContext实例是保存在堆内。
相信大家和笔者一样,在工作中多使用无状态重试,但其实有状态重试也是有用武之地的,比如:事务回滚和熔断器。
在事务回滚场景中,当目标方法(业务方法)抛出特定异常时,重试变得没有意义了,需要立即从execute()方法内的while循环语句内重新抛出该异常,从而进行事务回滚操作,重新抛出异常代码如下所示:
if (shouldRethrow(retryPolicy, context, state)) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
在熔断器场景中,CircuitBreakerRetryPolicy一般与SimpleRetryPolicy或MaxAttemptsRetryPolicy组合使用,前者委派后者来进行重试决策,但需要明确一点:目标方法每次只会在execute()方法内的while循环中执行一次(目标方法的每一次执行都是在不同的线程中完成的),之后借助于如下代码立即退出while循环:
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
RetryState
接口用于标识无状态重试与有状态重试,具体地,在调用execute()方法时,若RetryState参数为null,则意味着无状态重试,否则为有状态重试。RetryState接口定义了三个方法:
getKey()
,生成RetryContext实例的全局唯一标识;isForceRefresh()
,若值为true
,则每次都重新生成RetryContext实例,若值为false
,则从RetryContextCache中查询RetryContext实例;rollbackFor()
,标识该异常是否需要回滚。public interface RetryState {
Object getKey();
boolean isForceRefresh();
boolean rollbackFor(Throwable exception);
}
属性 | 描述 | 数据类型 | 默认值 |
---|---|---|---|
value | 重试间隔时间,若delay>0,则忽略value值,否则选用value值 | long | 1000 ms |
delay | 重试间隔时间,若delay=0时,则选用value值;否则选用delay值 | long | 0 ms |
maxDelay | 最大重试间隔时间,若maxDelay<delay,则maxDelay=30000 ms | long | 0 ms |
multiplier | 乘数,若multiplier=0,则忽略;若multiplier>0,则用于生成下一次重试间隔时间 | double | 0 |
delayExpression | 重试间隔时间表达式,一般用于从配置文件中加载delay值 | String | "" |
maxDelayExpression | 最大重试间隔时间表达式,一般用于从配置文件中加载maxDelay值 | String | "" |
multiplierExpression | 乘数表达式,一般用于从配置文件中加载multiplier值 | String | "" |
属性 | 描述 | 数据类型 | 默认值 |
---|---|---|---|
recover | 兜底方法 | String | "" |
interceptor | 重试拦截器MethodInterceptor | String | "" |
value | 可重试异常白名单 | Class<? extends Throwable>[] | {} |
include | 可重试异常白名单 | Class<? extends Throwable>[] | {} |
exclude | 可重试异常黑名单 | Class<? extends Throwable>[] | {} |
stateful | 有状态重试或无状态重试 | boolean | false |
maxAttempts | 最大重试次数 | int | 3 |
maxAttemptsExpression | 最大重试次数表达式,一般用于从配置文件中加载maxAttempts值 | String | "" |
backoff | 退避策略 | Backoff | @Backoff() |
listeners | 重试监听器 | String[] | {} |
@Recover
是一个标记接口,并没有定义相关属性。但需要注意:兜底方法的第一个参数是可选的,若存在,则只能是Throwable
及其子类,其余参数必须与目标方法中的参数一致,包括:类型和顺序。
属性 | 描述 | 数据类型 | 默认值 |
---|---|---|---|
value | 可重试异常白名单 | Class<? extends Throwable>[] | {} |
include | 可重试异常白名单 | Class<? extends Throwable>[] | {} |
exclude | 可重试异常黑名单 | Class<? extends Throwable>[] | {} |
maxAttempts | 最大重试次数 | int | 3 |
maxAttemptsExpression | 最大重试次数表达式,一般用于从配置文件中加载maxAttempts值 | String | "" |
resetTimeout | 熔断器重置超时时间,如果熔断器开启时间大于resetTimeout值,则在下一次调用目标方法时重置该熔断器,从而使得目标方法可以尝试从下游接口获得响应数据 | int | 20000 |
resetTimeoutExpression | 熔断器重置超时时间表达式 | String | "" |
openTimeout | 熔断器开始超时时间,若目标方法在(0,openTimeout)时间范围内重试次数耗尽,那么将自动打开熔断器,从而屏蔽目标方法对下游接口的访问 | int | 5000 |
openTimeoutExpression | 熔断器开始超时时间表达式 | String | "" |
GoogleSearchService
通过委派谷歌搜索接口来搜索给定内容。GoogleSearchService
public interface GoogleSearchService {
public void search(String content) throws IOException, InterruptedException, URISyntaxException;
}
GoogleSearchServiceImpl
public class GoogleSearchServiceImpl implements GoogleSearchService {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);
@Override
public void search(String content) throws IOException, InterruptedException, URISyntaxException {
LOGGER.info(">>>>>> searching··· ");
URL url = new URL("https://www.google.com/complete/search?q=" + content);
URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.build();
HttpClient client = HttpClient
.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(3))
.build();
HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
}
}
需求
若search()
方法抛出HttpConnectTimeoutException异常,则重复执行search()
方法,最大重试次数为三次(包括初始执行),重试间隔为500毫秒;如果重试耗尽后,search()
方法依然抛出该异常,则打印错误日志信息。
下面分别从两个小节来展示编程式重试和声明式重试是如何实现上述需求的。
public class ProgrammaticSpringRetryApp {
private static final Logger LOGGER = LoggerFactory.getLogger(ProgrammaticSpringRetryApp.class);
public static void main(String[] args) throws Throwable {
GoogleSearchService googleSearchService = new GoogleSearchServiceImpl();
// 重试策略:CompositeRetryPolict,由MaxAttemptRetryPolicy和BinaryExceptionClassifierRetryPolicy组成
// 退避策略:FixedBackOffPolicy,重试间隔500毫秒
RetryTemplate retryTemplate = RetryTemplate
.builder()
.retryOn(HttpConnectTimeoutException.class)
.maxAttempts(3)
.fixedBackoff(500)
.build();
retryTemplate.execute(
new RetryCallback<Object, Throwable>() {
@Override
public Object doWithRetry(RetryContext context) throws Throwable {
googleSearchService.search("spring retry");
return null;
}
},
new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
return null;
}
}
);
}
}
执行结果
2021:20:25.005 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
2021:20:25.015 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
2021:20:30.118 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
2021:20:33.641 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
2021:20:36.656 [main] ERROR pers.duxiaotou.ProgrammaticSpringRetryApp - 调用谷歌搜索接口失败,执行兜底逻辑
@Service
@Slf4j
public class GoogleSearchServiceImpl implements GoogleSearchService {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);
@Retryable(
maxAttempts = 3,
backoff = @Backoff(value = 500),
include = {HttpConnectTimeoutException.class},
stateful = false)
@Override
public void search(String content) throws IOException, InterruptedException, URISyntaxException {
LOGGER.info(">>>>>> searching··· ");
URL url = new URL("https://www.google.com/complete/search?q=" + content);
URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.build();
HttpClient client = HttpClient
.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(3))
.build();
HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
}
@Recover
public void recover(Throwable throwable, String content) {
LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
}
}
@SpringBootApplication
@EnableRetry
public class DuXiaoTouSpringAopApp {
public static void main(String[] args) throws Throwable {
ConfigurableApplicationContext applicationContext = SpringApplication.run(DuXiaoTouSpringAopApp.class, args);
GoogleSearchService googleSearchService = applicationContext.getBean(GoogleSearchService.class);
googleSearchService.search("spring retry");
}
}
执行结果
2021-06-14 21:39:01.321 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=0
2021-06-14 21:39:01.321 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=1
2021-06-14 21:39:04.908 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=2
2021-06-14 21:39:08.437 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
2021-06-14 21:39:11.450 ERROR 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : 调用谷歌搜索接口失败,执行兜底逻辑
将@Retryable
注解添加到目标对象GoogleSearchServiceImpl中search()方法后,当search()方法抛出HttpConnectTimeoutException异常时,Spring Retry会自动为调用方重复执行search()方法。那Spring Retry究竟是如何为调用方提供自动重试能力的呢?众所周知,获取重试能力的关键在于@EnableRetry
注解,该注解可以开启Spring Retry开关。那@EnableRetry注解可能就是解开谜团的突破口了。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {
boolean proxyTargetClass() default false;
}
在@Retryable源码中,我们发现了@EnableAspectJAutoProxy
注解的身影,这说明Spring Retry是基于Spring AOP为目标对象生成代理对象从而拓展出重试能力的!
关于Spring AOP的基础知识请参考《Spring AOP,从入门到进阶》。BeanPostProcessor
接口是Spring中常用的IoC容器拓展点;有了BeanPostProcessor,任何人都可以在Bean初始化前后对其进行个性化改造,甚至使用代理对象将Bean替换;在Spring AOP中,扮演BeanPostProcessor角色的是AbstractAutoProxyCreator
抽象类,其主要用于创建代理对象。
public abstract class AbstractAutoProxyCreator implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
return wrapIfNecessary(bean, beanName);
}
return bean;
}
protected Object wrapIfNecessary(Object bean, String beanName) {
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
return proxy;
}
return bean;
}
}
从上述AbstractAutoProxyCreator源码中,我们发现:只有当某一Bean获取到PointcutAdvisor
时(specificInterceptors数组非空),才会为该Bean生成代理对象。显然,Spring Retry肯定定义了一个PointcutAdvisor,要不然谁会闲的无聊帮你定义哦!
RetryConfiguration
就是Spring Retry自己定义的PointcutAdvisor,它主要负责构建Advice
和Pointcut
,RetryConfiguration源码如下:
@Component
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware, InitializingBean {
private Advice advice;
private Pointcut pointcut;
@Override
public void afterPropertiesSet() throws Exception {
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
retryableAnnotationTypes.add(Retryable.class);
this.pointcut = buildPointcut(retryableAnnotationTypes);
this.advice = buildAdvice();
}
protected Advice buildAdvice() {
return new AnnotationAwareRetryOperationsInterceptor();
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {
Pointcut filter = new RetryConfiguration.AnnotationClassOrMethodPointcut(retryAnnotationType);
if (result == null) {
result = new ComposablePointcut(filter);
}
else {
result.union(filter);
}
}
return result;
}
}
至此,我们搞明白了一点,即只要目标对象含有@Retryable注解,那么就一定可以获取到RetryConfiguration这一PointcutAdvisor,继而为目标对象生成代理对象!
接下来,我们需要搞清楚RetryConfiguration
在构建Advice时所使用的AnnotationAwareRetryOperationsInterceptor
有何意义?AnnotationAwareRetryOperationsInterceptor继承自MethodInterceptor
,无疑invoke()
是核心逻辑。
public class AnnotationAwareRetryOperationsInterceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// invocation.getThis()获取到的为目标对象,如:GoogleSearchServiceImpl
// invocation.getMethod()获取到的为目标方法,如:search()
// 若是无状态重试,则委派RetryOperationsInterceptor进行处理
// 若是有状态重试,则委派StatefulRetryOperationsInterceptor进行处理
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
}
else {
return invocation.proceed();
}
}
/**
* 根据目标对象和目标方法获取对应的重试拦截器
*/
private MethodInterceptor getDelegate(Object target, Method method) {
MethodInterceptor interceptor = NULL_INTERCEPTOR;
Retryable retryable = findAnnotationOnTarget(target, method, Retryable.class);
if (retryable != null) {
if (StringUtils.hasText(retryable.interceptor())) {
interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
}
else if (retryable.stateful()) {
interceptor = getStatefulInterceptor(target, method, retryable);
}
else {
interceptor = getStatelessInterceptor(target, method, retryable);
}
}
return interceptor;
}
/**
* 无状态重试拦截器:RetryOperationsInterceptor
*/
private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {
RetryTemplate template = createTemplate(retryable.listeners());
template.setRetryPolicy(getRetryPolicy(retryable));
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label())
.recoverer(getRecoverer(target, method)).build();
}
/**
* 有状态重试拦截器:StatefulRetryOperationsInterceptor
*/
private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
RetryTemplate template = createTemplate(retryable.listeners());
template.setRetryContextCache(this.retryContextCache);
CircuitBreaker circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);
if (circuit != null) {
RetryPolicy policy = getRetryPolicy(circuit);
CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
breaker.setOpenTimeout(getOpenTimeout(circuit));
breaker.setResetTimeout(getResetTimeout(circuit));
template.setRetryPolicy(breaker);
template.setBackOffPolicy(new NoBackOffPolicy());
String label = circuit.label();
if (!StringUtils.hasText(label)) {
label = method.toGenericString();
}
return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit"))
.retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();
}
RetryPolicy policy = getRetryPolicy(retryable);
template.setRetryPolicy(policy);
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
String label = retryable.label();
return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator)
.newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label)
.recoverer(getRecoverer(target, method)).build();
}
}
AnnotationAwareRetryOperationsInterceptor的invoke()主要逻辑有:
RetryOperationsInterceptor
和StatefulRetryOperationsInterceptor
同样继承自MethodInterceptor接口,继续跟进invoke()方法。RetryOperationsInterceptor
public class RetryOperationsInterceptor implements MethodInterceptor {
public Object invoke(final MethodInvocation invocation) throws Throwable {
String name;
if (StringUtils.hasText(label)) {
name = label;
} else {
name = invocation.getMethod().toGenericString();
}
final String label = name;
// 封装RetryCallback回调实例
// RetryCallback中doWithRetry()方法内最终将执行到目标对象的目标方法
RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, label);
return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
}
};
if (recoverer != null) {
RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
return this.retryOperations.execute(retryCallback, recoveryCallback);
}
return this.retryOperations.execute(retryCallback);
}
}
StatefulRetryOperationsInterceptor
public class StatefulRetryOperationsInterceptor implements MethodInterceptor {
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Object[] args = invocation.getArguments();
Object defaultKey = Arrays.asList(args);
if (args.length == 1) {
defaultKey = args[0];
}
Object key = createKey(invocation, defaultKey);
RetryState retryState = new DefaultRetryState(key,
this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
this.rollbackClassifier);
RetryCallback<Object, Throwable> retryCallback = new StatefulMethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, label);
return this.invocation.proceed();
}
};
if (recoverer != null) {
ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(args, this.recoverer, retryState);
return this.retryOperations.execute(retryCallback, recoveryCallback, retryState);
}
return this.retryOperations.execute(retryCallback, null, retryState);
}
}
从上面RetryOperationsInterceptor和StatefulRetryOperationsInterceptor的源码来看,二者invoke()实现逻辑基本一致,最终都是手动调用RetryTemplate#execute()
方法。
最后,RetryTemplate中的若干execute()重载方法的核心逻辑均落在doExecute()
方法,解读如下:
public class RetryTemplate implements RetryOperations {
private static final String GLOBAL_STATE = "state.global";
protected <T, E extends Throwable> T doExecute(
RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
// 重试策略
RetryPolicy retryPolicy = this.retryPolicy;
// 退避策略
BackOffPolicy backOffPolicy = this.backOffPolicy;
// 对于无状态重试,直接重新生成RetryContext实例
// 对于有状态重试,则从RetryContextCache获取已有RetryContext实例
RetryContext context = open(retryPolicy, state);
// 上一次执行目标方法时抛出异常
Throwable lastException = null;
// 重试是否已耗尽
boolean exhausted = false;
try {
// 在执行目标方法前,执行重试监听器的open()方法
boolean running = doOpenInterceptors(retryCallback, context);
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// 生成BackOffContext实例,其内维护的退避策略相关信息,比如重试间隔值
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
// while循环
// 委托具体RetryPolicy来进行重试判决
// context.isExhaustedOnly()一般总是false
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
lastException = null;
// 执行目标方法,即业务方法,同步回调
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
// 更新lastException
lastException = e;
// 注册异常
// 首先,委托RetryPolicy的registerThrowable()方法注册异常,如:记录重试次数
// 然后,若是有状态重试,则注册当前RetryContext,即将其添加到RetryContextCache
registerThrowable(retryPolicy, state, context, e);
// 执行重试监听器的OnError()方法
doOnErrorInterceptors(retryCallback, context, e);
// 当前执行目标方法失败,会执行退避策略,但前提是可以继续重试
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
// 执行退避策略,主要依赖Thread.sleep()方法来模拟重试间隔
backOffPolicy.backOff(backOffContext);
}
// 是否应该立即重新抛出异常
// 对于无状态异常,是不会重新抛出异常的
// 对于有状态异常,则委托RetryState进行判断是否需要重新抛出异常:
// state != null && state.rollbackFor(context.getLastThrowable())
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
// 抛出异常
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
// 能执行到这里,说明目标方法执行一定是执行失败了的,但并不需要重新抛出异常,即shouldRethrow()为false,
// shouldRethrow()为false,有两种情况:无状态重试和有状态重试;
// 这部分代码主要用于退出while循环,而当前只有在熔断器重试策略中,RetryContext中才会有GLOBAL_STATE哦,
// 这也说明在熔断器模式下,调用方每次调用目标方法,最终只会执行一次,会在此退出while循环啊。
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
// 执行至此,说明重试已耗尽了
exhausted = true;
// 重试耗尽,若有兜底方法,则执行兜底逻辑;否则抛出异常
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
// 执行重试监听器的close()方法
doCloseInterceptors(retryCallback, context, lastException);
}
}
}
Spring Retry声明式重试执行流程