本文主要研究一下eureka的PeerAwareInstanceRegistryImpl
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
//......
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
//......
}
这里主要关注PeerAwareInstanceRegistry,EurekaClient的配置见EurekaClientAutoConfiguration
spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/server/InstanceRegistry.java
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
implements ApplicationContextAware {
//......
private void handleCancelation(String appName, String id, boolean isReplication) {
log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
private void log(String message) {
if (log.isDebugEnabled()) {
log.debug(message);
}
}
private void publishEvent(ApplicationEvent applicationEvent) {
this.ctxt.publishEvent(applicationEvent);
}
}
这个继承了PeerAwareInstanceRegistryImpl,重写方法主要是在执行之前发布相关的事件
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.java
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
//......
}
这个类实现了InstanceRegistry的openForTraffic、shutdown、statusUpdate、deleteStatusOverride等方法,以及PeerAwareInstanceRegistry的init、syncUp、shouldAllowAccess、register(InstanceInfo info, boolean isReplication)、statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication)等方法。 而AbstractInstanceRegistry主要是实现了LeaseManager的关于register、cancel、renew、evict等基本操作,以及LookupService的一些查询操作 PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新
如下图所示,PeerAwareInstanceRegistryImpl重写了cancel、register、renew、statusUpdate、deleteStatusOverride方法,首先调用super的操作,然后调用replicateToPeers
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
这个方法就是把instance信息复制给eureka server peers。其中isReplication用来标识这次请求是不是其他节点复制过来的。 可以看到如果peerEurekaNodes为空,或者isReplication为true的话,则不继续往下
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
这里根据不同的Action来调用PeerEurekaNode的不同方法
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.java
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/transport/JerseyReplicationClient.java
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {
private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class);
private final EurekaJerseyClient jerseyClient;
private final ApacheHttpClient4 jerseyApacheClient;
public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) {
super(jerseyClient.getClient(), serviceUrl);
this.jerseyClient = jerseyClient;
this.jerseyApacheClient = jerseyClient.getClient();
}
@Override
protected void addExtraHeaders(Builder webResource) {
webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}
//......
}
这里显示设置了PeerEurekaNode.HEADER_REPLICATION的值为true,也就是标识从这个client触发的请求都是属于replication行为的,告诉目标eureka server不用再replicate避免死循环。
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.java
/**
* Handles cancellation of leases for this particular instance.
*
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
* @return response indicating whether the operation was a success or
* failure.
*/
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
这里以cancel为例,再串一下replication。这里可以发现把PeerEurekaNode.HEADER_REPLICATION的值传递到registry.cancel方法,而这个registry就是PeerAwareInstanceRegistryImpl,这样就循环串起来了。
PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新。replicate给其他peer的时候传递了PeerEurekaNode.HEADER_REPLICATION为true的header,表示这个是replicate请求,这样接收方就不会再replicate给他的peer。