dubbo版本2.5.3
我们这里以zookeeper作为注册中心为例说明。
这里说的集群,可以理解为,一个接口服务对应有多个提供者。 在dubbo的调用方(reference)看来,每个提供方(service)对应一个invoker。 关于一个调用方对应多个提供方的场景大概包括三大类: 1,者调者订阅一个注册中心,此注册中心,同一个服务有多个提供者(以不同机器,端口,版本等发布的服务) 2,者调者订阅多个注册中心的服务,每个注册中心都有引用的服务的提供者(一个或者多个)。 3,调用方,通过url配置,提供多个提供者地址,多个地址以分号隔开。 1,2是同一类场景,3是直连场景,这两中场景是互斥,也就是用户配置了reference的url属性,dubbo就不会再订阅注册中心。
下面通过代码分析下,这三种场景的集群容错 客户端订阅可以看ReferenceConfig类的createProxy方法里以下代码
if (isJvmRefer) {//引用本地服务,只返回一个exporter不会有集群。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {//应用远程服务
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {//用户自定多个直连服务
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {//用户自定多个注册中心
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {//调用方订阅一个注册中心,或者自定一个直连服务(直连的这种情况不考虑集群,只有一个提供者)
//一个注册中心时,这个refprotocol自适应后是RegistryProtocol
//一个直连者时,这个refprotocol自适应后是DubboProtocol(如果是duboo协议)
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {//调用方订阅多个注册中心,或者多个直连地址
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster 容错策略
// 对于订阅多个注册中心的,这里其实有两层的容错机制,只是第一层,被强制设置为AvailableCluster 容错策略
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景2执行逻辑
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景3执行逻辑
}
}
}
通过代码我们看到,对于场景1,引用一个注册中心的场景,会执行 invoker = refprotocol.refer(interfaceClass, urls.get(0));代码
通过代码调试,可以发现,refprotocol.refer会调用RegistryProtocol的refer方法最终进入doRefer方法, 会执行如下代码
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);//cluster是通过spi机制注入的自适应adaptive实现。此时的directory是RegistryDirectory类型
}
三种场景实际上,都执行了dubbo SPI机制生成的adaptive的Cluster实现代码 通过dubbo打印日志,可以看到adaptive的Cluster实现代码如下
package com.alibaba.dubbo.rpc.cluster;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");//可以看到,通过url里的cluster键值获取容错机制,url中没有指定cluster键值,dubbo默认是用failover集群容错策略
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
到此,我们可以看到对于多注册中心的,第一层容错机制被强制设置为available, 然后第二层,就和单个注册中心多服务提供者集群容错机制一样了,即默认为failover容错机制。这里看下这两种容错机制的代码实现 1,failover容错机制 通过spi机制我们找到Cluster failover扩展FailoverCluster类是这样实现的
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
接着看FailoverClusterInvoker类,先看它的父类AbstractClusterInvoker,这个类实现了Invoker接口:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
//这里是获取负载均衡策略
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);//回调子类的doInvoke方法
}
然后再回到子类看doInvoke方法:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
//获取重试次数 +1是因为第一次调用不算重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
//重新检查一下
checkInvokers(copyinvokers, invocation);
}
//这里是通过负载均衡策略获取下一个服务提供者
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.如果是业务异常,则不再重试,直接抛出异常
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());//记录调用失败信息
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
}
通过代码可以看到, failvoer集群容错机制,总的逻辑是,以方法重复次数为限制,每次调用如果失败, 就利用负责均衡策略获取下一个提供者(invoker),直到调用成功,或者最后方法超限,抛出异常, 其中中间如果有业务异常,则不再重试,直接抛出异常。
2,available集群容错机制,我们找到AvailableCluster类,它只有一个方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//它没通过扩展AbstractClusterInvoker抽象类,而是直接实现它,它没用负载均衡策略,而是简单选择一个可达的服务
return new AbstractClusterInvoker<T>(directory) {
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {//获取第一个,可达的服务提供方,
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
通过代码可以看到, available集群容错机制,则是简单的调用第一个可到达的服务。都不可达是,抛出异常
最后 dubbo本身还有其他集群容错的扩展实现
dubbo集群容错策略的代码分析2