首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊eureka的PeerAwareInstanceRegistryImpl

聊聊eureka的PeerAwareInstanceRegistryImpl

作者头像
code4it
发布于 2018-09-17 08:40:54
发布于 2018-09-17 08:40:54
1.1K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下eureka的PeerAwareInstanceRegistryImpl

EurekaServerAutoConfiguration

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //......
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }
    //......
}

这里主要关注PeerAwareInstanceRegistry,EurekaClient的配置见EurekaClientAutoConfiguration

InstanceRegistry

spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/server/InstanceRegistry.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
        implements ApplicationContextAware {
        //......
    private void handleCancelation(String appName, String id, boolean isReplication) {
        log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
        publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    }

    private void handleRegistration(InstanceInfo info, int leaseDuration,
            boolean isReplication) {
        log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                + ", leaseDuration " + leaseDuration + ", isReplication "
                + isReplication);
        publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
                isReplication));
    }

    private void log(String message) {
        if (log.isDebugEnabled()) {
            log.debug(message);
        }
    }

    private void publishEvent(ApplicationEvent applicationEvent) {
        this.ctxt.publishEvent(applicationEvent);
    }
}

这个继承了PeerAwareInstanceRegistryImpl,重写方法主要是在执行之前发布相关的事件

PeerAwareInstanceRegistryImpl

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    //......
}

这个类实现了InstanceRegistry的openForTraffic、shutdown、statusUpdate、deleteStatusOverride等方法,以及PeerAwareInstanceRegistry的init、syncUp、shouldAllowAccess、register(InstanceInfo info, boolean isReplication)、statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication)等方法。 而AbstractInstanceRegistry主要是实现了LeaseManager的关于register、cancel、renew、evict等基本操作,以及LookupService的一些查询操作 PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新

replicateToPeers

如下图所示,PeerAwareInstanceRegistryImpl重写了cancel、register、renew、statusUpdate、deleteStatusOverride方法,首先调用super的操作,然后调用replicateToPeers

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

这个方法就是把instance信息复制给eureka server peers。其中isReplication用来标识这次请求是不是其他节点复制过来的。 可以看到如果peerEurekaNodes为空,或者isReplication为true的话,则不继续往下

replicateInstanceActionsToPeers

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

这里根据不同的Action来调用PeerEurekaNode的不同方法

PeerEurekaNode

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java

HttpReplicationClient

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/transport/JerseyReplicationClient.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {

    private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class);

    private final EurekaJerseyClient jerseyClient;
    private final ApacheHttpClient4 jerseyApacheClient;

    public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) {
        super(jerseyClient.getClient(), serviceUrl);
        this.jerseyClient = jerseyClient;
        this.jerseyApacheClient = jerseyClient.getClient();
    }

    @Override
    protected void addExtraHeaders(Builder webResource) {
        webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }

//......
}

这里显示设置了PeerEurekaNode.HEADER_REPLICATION的值为true,也就是标识从这个client触发的请求都是属于replication行为的,告诉目标eureka server不用再replicate避免死循环。

InstanceResource

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Handles cancellation of leases for this particular instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }

这里以cancel为例,再串一下replication。这里可以发现把PeerEurekaNode.HEADER_REPLICATION的值传递到registry.cancel方法,而这个registry就是PeerAwareInstanceRegistryImpl,这样就循环串起来了。

小结

PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基础上,新增了peer相关的处理以及RenewalThreshold的更新。replicate给其他peer的时候传递了PeerEurekaNode.HEADER_REPLICATION为true的header,表示这个是replicate请求,这样接收方就不会再replicate给他的peer。

doc

  • 聊聊springcloud的EurekaClientAutoConfiguration
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Eureka应用注册与集群数据同步源码解析
在之前的EurekaClient自动装配及启动流程解析一文中我们提到过,在构造DiscoveryClient类时,会把自身注册到服务端,本文就来分析一下这个注册流程
Java学习录
2019/10/21
5140
深入理解Eureka之源码解析
方志朋
2017/12/29
1.3K0
深入理解Eureka之源码解析
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
也就是当有EurekaServerMarkerConfiguration.Marker.class时,才会注入
JavaEdge
2020/05/27
7430
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
【一起学源码-微服务】Nexflix Eureka 源码十二:EurekaServer集群模式源码分析
哈哈 转眼间都2020年了,这个系列的文章从12.17 一直写到现在,也是不容易哈,每天持续不断学习,输出博客,这一段时间确实收获很多。
一枝花算不算浪漫
2020/02/11
4480
聊聊spring cloud的DefaultEurekaServerContext
本文主要研究一下spring cloud的DefaultEurekaServerContext
code4it
2018/09/17
6510
Eureka Server过源码
Eureka Server启动 入口EurekaServerInitializerConfiguration @Configuration class EurekaServerInitializerConfiguration implements ServletContextAware @Override public void start() { new Thread(new Runnable() { @Override public void run() {
叔叔
2018/04/11
6270
注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
Eureka-Server 可以使用直接配置所有节点的服务地址,或者基于 DNS 配置。推荐阅读:《Spring Cloud构建微服务架构(六)高可用服务注册中心》 。
芋道源码
2018/08/17
1.6K0
注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
聊聊eureka instance的lastDirtyTimestamp
本文主要研究一下eureka instance的lastDirtyTimestamp
code4it
2018/09/17
1.4K0
聊聊eureka的TaskDispatcher
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/ReplicationTask.java
code4it
2018/09/17
8170
Eureka Server之间的注册表信息同步
前言 Eureka 作为一个服务注册中心,Eureka Server必然是可以通过集群的方式进行部署,但是分布式系统中一个很关键的点就是数据的一致性,多节点部署的Eureka Server必然涉及到不
aoho求索
2018/04/03
3.4K0
Eureka Server之间的注册表信息同步
SpringCloud源码:服务端分析(二)- EurekaServer分析
从昨日的两篇文章:SpringCloud源码:客户端分析(一)- SpringBootApplication注解类加载流程、SpringCloud源码:客户端分析(二)- 客户端源码分析。
后台技术汇
2024/10/05
1800
SpringCloud源码:服务端分析(二)- EurekaServer分析
Eureka 源码分析之 Eureka Server
上一篇文章《Eureka 源码分析之 Eureka Client》 通过源码知道 ,eureka Client 是通过 http rest来 与 eureka server 交互,实现 注册服务,续约服务,服务下线 等。本篇探究下eureka server。
程序员果果
2019/06/19
6750
注册中心 Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态
这里要注意下,不是应用实例的状态( status ),而是覆盖状态( overridestatus ) 。代码如下:
芋道源码
2018/07/31
9110
注册中心 Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态
聊聊eureka的renewal
eureka client在实例化的时候注册了一个定时任务,每隔renewalIntervalInSecs,向eureka server发送一次renewal。具体详见聊聊eureka client的HeartbeatThread
code4it
2018/09/17
9660
注册中心 Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态
这里要注意下,不是应用实例的状态( status ),而是覆盖状态( overridestatus ) 。代码如下:
芋道源码
2018/07/31
1K0
注册中心 Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态
eureka监听各服务状态,下线、重连等,并做相应的处理
在一些场景下,我们需要监听eureka服务中心的一些状态,譬如某个微服务挂掉了,我们希望能监听到,并给管理员发送邮件通知。
天涯泪小武
2019/01/17
3.5K0
注册中心 Eureka 源码解析 —— 应用实例注册发现 (三)之下线
本文主要分享 Eureka-Client 向 Eureka-Server 下线应用实例的过程。
芋道源码
2018/07/31
6310
注册中心 Eureka 源码解析 —— 应用实例注册发现 (三)之下线
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
所以@EnableEurekaServer就是个开关,只要写了该注解,spring 就会帮你把EurekaServerAutoConfiguration类注入进来。
JavaEdge
2020/05/12
1.5K0
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
注册中心 Eureka 源码解析 —— 注册表 InstanceRegistry 类关系
本文主要基于 Eureka 1.8.X 版本 1. 概述 2. 类图 3. LookupService 4. LeaseManager 5. InstanceRegistry 6. AbstractI
芋道源码
2018/06/13
7580
【一起学源码-微服务】Nexflix Eureka 源码四:EurekaServer启动之完成上下文构建及EurekaServer总结
上篇文章已经介绍了 Eureka Server上下文创建相关的Eureka Client逻辑,这一部分还是比较复杂的。接下来就讲解下Eureka Server上下文初始化最后的部分,然后加上整个Eureka Server启动的总结。
一枝花算不算浪漫
2019/12/30
4980
【一起学源码-微服务】Nexflix Eureka 源码四:EurekaServer启动之完成上下文构建及EurekaServer总结
推荐阅读
相关推荐
Eureka应用注册与集群数据同步源码解析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档